AkademikForge-V1 / fusion_pipeline.py
Syahdewo's picture
Upload 7 files
3be0066 verified
# fusion_pipeline.py
# ========== ENV PATCH HARUS PALING ATAS ==========
import os
# Matikan semua hal terkait TensorFlow / Keras di transformers
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0"
os.environ["TRANSFORMERS_NO_TF"] = "1"
os.environ["TRANSFORMERS_NO_TF2"] = "1"
os.environ["USE_TF"] = "0"
os.environ["DISABLE_MLFLOW_INTEGRATION"] = "1"
os.environ["DISABLE_TELEMETRY"] = "1"
# ================================================
import sys
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel
from sentence_transformers import SentenceTransformer
from typing import Dict, Tuple, Optional
import logging
from pathlib import Path
import warnings
# Suppress warnings umum
warnings.filterwarnings("ignore")
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("fusion_pipeline.log", encoding="utf-8"),
logging.StreamHandler(sys.stdout),
],
)
logger = logging.getLogger(__name__)
class AkademikForgeFusion:
"""
Pipeline fusion 3-model untuk deteksi plagiarisme:
- BGE-M3 (embedding multi-lingual)
- SBERT Multilingual (semantic similarity)
- IndoBERT Fine-Tuned (plagiarism classifier)
"""
def __init__(
self,
device: Optional[str] = None,
weights: Optional[Dict[str, float]] = None,
cache_dir: str = "Models",
):
# Device configuration
self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"πŸ”§ Using device: {self.device}")
# Weights untuk fusion (bisa di-tune)
self.weights = weights or {
"bge_m3": 0.35,
"sbert": 0.35,
"indobert": 0.30,
}
self._validate_weights()
# Threshold configuration
self.thresholds = {
"high": 0.85, # Plagiarisme Tinggi
"medium": 0.70, # Mirip/Parafrase
"low": 0.0, # Unik
}
# Cache directory validation
self.cache_dir = Path(cache_dir)
if not self.cache_dir.exists():
raise FileNotFoundError(
f"❌ Model directory '{cache_dir}' tidak ditemukan!"
)
# Load models
self._load_models()
logger.info("βœ… Pipeline initialization completed!\n")
def _validate_weights(self):
total = sum(self.weights.values())
if not (0.99 <= total <= 1.01):
raise ValueError(f"❌ Total weights harus = 1.0, got {total}")
def _load_models(self):
"""Load semua model dengan error handling"""
try:
# 1. BGE-M3
logger.info("πŸš€ Loading BGE-M3...")
bge_path = self.cache_dir / "bge-m3"
if not bge_path.exists():
raise FileNotFoundError(f"BGE-M3 not found at {bge_path}")
self.bge_tokenizer = AutoTokenizer.from_pretrained(
str(bge_path),
local_files_only=True,
)
self.bge_model = AutoModel.from_pretrained(
str(bge_path),
local_files_only=True,
).to(self.device)
self.bge_model.eval()
logger.info("βœ“ BGE-M3 loaded")
# 2. SBERT Multilingual
logger.info("πŸš€ Loading SBERT Multilingual...")
sbert_path = self.cache_dir / "sbert-multilingual"
if not sbert_path.exists():
raise FileNotFoundError(f"SBERT not found at {sbert_path}")
self.sbert = SentenceTransformer(
str(sbert_path),
device=self.device,
)
logger.info("βœ“ SBERT Multilingual loaded")
# 3. IndoBERT Fine-Tuned
logger.info("πŸš€ Loading IndoBERT Fine-Tuned...")
indo_path = self.cache_dir / "indobert-finetuned"
if not indo_path.exists():
raise FileNotFoundError(f"IndoBERT not found at {indo_path}")
self.indo_tokenizer = AutoTokenizer.from_pretrained(
str(indo_path),
local_files_only=True,
)
self.indo_model = AutoModel.from_pretrained(
str(indo_path),
local_files_only=True,
).to(self.device)
self.indo_model.eval()
logger.info("βœ“ IndoBERT Fine-Tuned loaded")
except Exception as e:
logger.error(f"❌ Error loading models: {str(e)}")
raise
def _validate_text(self, text: str, name: str = "text") -> str:
if not isinstance(text, str):
raise TypeError(f"{name} harus berupa string, got {type(text)}")
text = text.strip()
if not text:
raise ValueError(f"{name} tidak boleh kosong")
if len(text) < 10:
logger.warning(
f"⚠️ {name} terlalu pendek (<10 karakter), hasil mungkin tidak akurat"
)
return text
@torch.no_grad()
def embed_bge(self, text: str) -> torch.Tensor:
try:
tokens = self.bge_tokenizer(
text,
return_tensors="pt",
truncation=True,
padding=True,
max_length=512,
).to(self.device)
output = self.bge_model(**tokens)
emb = output.last_hidden_state.mean(dim=1)
return emb.cpu()
except Exception as e:
logger.error(f"❌ Error in BGE embedding: {str(e)}")
raise
@torch.no_grad()
def embed_indobert(self, text: str) -> torch.Tensor:
try:
tokens = self.indo_tokenizer(
text,
return_tensors="pt",
truncation=True,
padding=True,
max_length=512,
).to(self.device)
output = self.indo_model(**tokens)
emb = output.last_hidden_state[:, 0, :]
return emb.cpu()
except Exception as e:
logger.error(f"❌ Error in IndoBERT embedding: {str(e)}")
raise
def embed_sbert(self, text: str) -> torch.Tensor:
try:
emb = self.sbert.encode(
text,
convert_to_tensor=True,
show_progress_bar=False,
)
return emb.cpu()
except Exception as e:
logger.error(f"❌ Error in SBERT embedding: {str(e)}")
raise
def cosine_similarity(self, a: torch.Tensor, b: torch.Tensor) -> float:
a_norm = F.normalize(a, p=2, dim=-1)
b_norm = F.normalize(b, p=2, dim=-1)
similarity = F.cosine_similarity(a_norm, b_norm, dim=-1).item()
return max(0.0, min(1.0, similarity))
def _get_label(self, score: float) -> Tuple[str, str]:
if score >= self.thresholds["high"]:
return "Plagiarisme Tinggi", "πŸ”΄ TERDETEKSI PLAGIARISME"
elif score >= self.thresholds["medium"]:
return "Mirip / Parafrase", "🟑 KEMUNGKINAN PARAFRASE"
else:
return "Unik / Tidak Mirip", "🟒 KONTEN ORIGINAL"
def compare(
self,
text1: str,
text2: str,
return_embeddings: bool = False,
verbose: bool = True,
) -> Dict:
try:
text1 = self._validate_text(text1, "text1")
text2 = self._validate_text(text2, "text2")
if verbose:
logger.info("\nπŸ” Generating embeddings...")
if verbose:
logger.info(" β†’ BGE-M3...")
bge1 = self.embed_bge(text1)
bge2 = self.embed_bge(text2)
sim_bge = self.cosine_similarity(bge1, bge2)
if verbose:
logger.info(" β†’ SBERT Multilingual...")
sbert1 = self.embed_sbert(text1)
sbert2 = self.embed_sbert(text2)
sim_sbert = self.cosine_similarity(sbert1, sbert2)
if verbose:
logger.info(" β†’ IndoBERT Fine-Tuned...")
indo1 = self.embed_indobert(text1)
indo2 = self.embed_indobert(text2)
sim_indo = self.cosine_similarity(indo1, indo2)
fusion_score = (
sim_bge * self.weights["bge_m3"]
+ sim_sbert * self.weights["sbert"]
+ sim_indo * self.weights["indobert"]
)
label, description = self._get_label(fusion_score)
result = {
"similarity": {
"bge_m3": round(sim_bge, 4),
"sbert_multilingual": round(sim_sbert, 4),
"indobert": round(sim_indo, 4),
"fusion_score": round(fusion_score, 4),
},
"label": label,
"description": description,
"weights_used": self.weights,
}
if return_embeddings:
result["embeddings"] = {
"bge_m3": (bge1, bge2),
"sbert": (sbert1, sbert2),
"indobert": (indo1, indo2),
}
return result
except Exception as e:
logger.error(f"❌ Error in comparison: {str(e)}")
raise
def batch_compare(
self,
text_pairs: list,
verbose: bool = False,
) -> list:
results = []
total = len(text_pairs)
logger.info(f"\nπŸ“Š Processing {total} text pairs...")
for idx, (text1, text2) in enumerate(text_pairs, 1):
if not verbose:
logger.info(f" [{idx}/{total}] Processing...")
result = self.compare(text1, text2, verbose=verbose)
results.append(result)
logger.info("βœ… Batch processing completed!")
return results
def update_thresholds(
self,
high: Optional[float] = None,
medium: Optional[float] = None,
):
if high is not None:
self.thresholds["high"] = high
if medium is not None:
self.thresholds["medium"] = medium
logger.info(f"βœ“ Thresholds updated: {self.thresholds}")
def update_weights(self, weights: Dict[str, float]):
self.weights = weights
self._validate_weights()
logger.info(f"βœ“ Weights updated: {self.weights}")
def __repr__(self) -> str:
return (
"AkademikForgeFusion(\n"
f" device={self.device},\n"
f" weights={self.weights},\n"
f" thresholds={self.thresholds}\n"
")"
)
def main():
...
# semua contoh text_a, text_b, pairs, dst
if __name__ == "__main__":
main()