Spaces:
Running
Running
| from __future__ import annotations | |
| import math | |
| import re | |
| from collections import Counter, defaultdict | |
| import numpy as np | |
| import torch | |
| from huggingface_hub import hf_hub_download | |
| from sentence_transformers import SentenceTransformer | |
| from tokenizers import ByteLevelBPETokenizer | |
| from transformers import AutoModelForSeq2SeqLM | |
| from engine.preprocessing import tokenize | |
| class TFIDFModel: | |
| def __init__(self) -> None: | |
| self.idf: dict[str, float] = {} | |
| self.N = 0 | |
| def fit(self, corpus: list[str]) -> TFIDFModel: | |
| n = len(corpus) | |
| df: dict[str, int] = defaultdict(int) | |
| for sent in corpus: | |
| for term in set(tokenize(sent)): | |
| df[term] += 1 | |
| self.idf = { | |
| term: math.log((n + 1) / (freq + 1)) + 1 | |
| for term, freq in df.items() | |
| } | |
| self.N = n | |
| return self | |
| def load_idf(self, idf: dict[str, float], n: int) -> TFIDFModel: | |
| self.idf = idf | |
| self.N = n | |
| return self | |
| def _score(self, sentence: str) -> float: | |
| tokens = tokenize(sentence) | |
| if not tokens: | |
| return 0.0 | |
| tf = Counter(tokens) | |
| return sum(tf[t] / len(tokens) * self.idf.get(t, 1.0) for t in tf) | |
| def summarize(self, sentences: list[str], top_n: int = 1) -> list[str]: | |
| if not sentences: | |
| return [""] | |
| scored = sorted(sentences, key=self._score, reverse=True) | |
| return scored[:top_n] | |
| class LexRankModel: | |
| THRESHOLD = 0.1 | |
| DAMPING = 0.85 | |
| MAX_ITER = 100 | |
| TOL = 1e-6 | |
| def __init__(self) -> None: | |
| self.idf: dict[str, float] = {} | |
| def fit(self, corpus: list[str]) -> LexRankModel: | |
| n = len(corpus) | |
| df: dict[str, int] = defaultdict(int) | |
| for sent in corpus: | |
| for term in set(tokenize(sent)): | |
| df[term] += 1 | |
| self.idf = { | |
| term: math.log((n + 1) / (freq + 1)) + 1 | |
| for term, freq in df.items() | |
| } | |
| return self | |
| def load_idf(self, idf: dict[str, float]) -> LexRankModel: | |
| self.idf = idf | |
| return self | |
| def _tfidf_vec(self, sentence: str) -> dict[str, float]: | |
| tokens = tokenize(sentence) | |
| if not tokens: | |
| return {} | |
| tf = Counter(tokens) | |
| return {t: (tf[t] / len(tokens)) * self.idf.get(t, 1.0) for t in tf} | |
| def _cosine(a: dict[str, float], b: dict[str, float]) -> float: | |
| common = set(a) & set(b) | |
| if not common: | |
| return 0.0 | |
| dot = sum(a[t] * b[t] for t in common) | |
| norm_a = math.sqrt(sum(v ** 2 for v in a.values())) | |
| norm_b = math.sqrt(sum(v ** 2 for v in b.values())) | |
| if norm_a == 0 or norm_b == 0: | |
| return 0.0 | |
| return dot / (norm_a * norm_b) | |
| def _pagerank(self, matrix: np.ndarray) -> np.ndarray: | |
| n = len(matrix) | |
| row_sums = matrix.sum(axis=1, keepdims=True) | |
| row_sums[row_sums == 0] = 1 | |
| p = matrix / row_sums | |
| scores = np.ones(n) / n | |
| for _ in range(self.MAX_ITER): | |
| new_scores = (1 - self.DAMPING) / n + self.DAMPING * p.T @ scores | |
| if np.abs(new_scores - scores).sum() < self.TOL: | |
| break | |
| scores = new_scores | |
| return scores | |
| def summarize(self, sentences: list[str], top_n: int = 1) -> list[str]: | |
| if len(sentences) == 1: | |
| return sentences[:top_n] | |
| vecs = [self._tfidf_vec(s) for s in sentences] | |
| n = len(sentences) | |
| sim = np.zeros((n, n)) | |
| for i in range(n): | |
| for j in range(i + 1, n): | |
| c = self._cosine(vecs[i], vecs[j]) | |
| if c >= self.THRESHOLD: | |
| sim[i, j] = sim[j, i] = c | |
| if sim.sum() == 0: | |
| scored = sorted(range(n), key=lambda i: sum(vecs[i].values()), reverse=True) | |
| return [sentences[i] for i in scored[:top_n]] | |
| scores = self._pagerank(sim) | |
| ranked = np.argsort(scores)[::-1] | |
| return [sentences[i] for i in ranked[:top_n]] | |
| class SentenceTransformerModel: | |
| def __init__(self, model_name: str = "all-MiniLM-L6-v2") -> None: | |
| self.model = SentenceTransformer(model_name) | |
| def summarize(self, sentences: list[str], top_n: int = 1) -> list[str]: | |
| if not sentences: | |
| return [""] | |
| embeddings = self.model.encode(sentences, convert_to_numpy=True) | |
| centroid = embeddings.mean(axis=0) | |
| norms = np.linalg.norm(embeddings, axis=1, keepdims=True) | |
| norms[norms == 0] = 1 | |
| sims = (embeddings / norms) @ (centroid / (np.linalg.norm(centroid) + 1e-9)) | |
| ranked = np.argsort(sims)[::-1] | |
| return [sentences[i] for i in ranked[:top_n]] | |
| class CodeT5Model: | |
| MODEL_NAME = "Salesforce/codet5-base-codexglue-sum-java" | |
| VOCAB_REPO = "Salesforce/codet5-base" | |
| _SPECIAL_TOKENS = ("<pad>", "<s>", "</s>", "<unk>", "<mask>") | |
| def __init__(self) -> None: | |
| vocab_file = hf_hub_download(self.VOCAB_REPO, "vocab.json") | |
| merges_file = hf_hub_download(self.VOCAB_REPO, "merges.txt") | |
| self.tokenizer = ByteLevelBPETokenizer(vocab_file, merges_file) | |
| self.model = AutoModelForSeq2SeqLM.from_pretrained(self.MODEL_NAME) | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.model.to(self.device) | |
| self.model.eval() | |
| def _clean(self, text: str) -> str: | |
| for tok in self._SPECIAL_TOKENS: | |
| text = text.replace(tok, " ") | |
| text = re.sub(r"<extra_id_\d+>", " ", text) | |
| return re.sub(r"\s+", " ", text).strip() | |
| def summarize(self, raw_code: str) -> str: | |
| if not raw_code or not raw_code.strip(): | |
| return "" | |
| ids = self.tokenizer.encode(raw_code).ids[:256] | |
| input_ids = torch.tensor([ids], device=self.device) | |
| attention = torch.ones_like(input_ids) | |
| with torch.no_grad(): | |
| output_ids = self.model.generate( | |
| input_ids=input_ids, | |
| attention_mask=attention, | |
| max_new_tokens=48, | |
| num_beams=4, | |
| early_stopping=True, | |
| no_repeat_ngram_size=3, | |
| ) | |
| decoded = self.tokenizer.decode(output_ids[0].tolist(), skip_special_tokens=False) | |
| return self._clean(decoded) | |