Spaces:
Running
Running
File size: 6,310 Bytes
537a1be 4465cb6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
from __future__ import annotations
import math
import re
from collections import Counter, defaultdict
import numpy as np
import torch
from huggingface_hub import hf_hub_download
from sentence_transformers import SentenceTransformer
from tokenizers import ByteLevelBPETokenizer
from transformers import AutoModelForSeq2SeqLM
from engine.preprocessing import tokenize
class TFIDFModel:
def __init__(self) -> None:
self.idf: dict[str, float] = {}
self.N = 0
def fit(self, corpus: list[str]) -> TFIDFModel:
n = len(corpus)
df: dict[str, int] = defaultdict(int)
for sent in corpus:
for term in set(tokenize(sent)):
df[term] += 1
self.idf = {
term: math.log((n + 1) / (freq + 1)) + 1
for term, freq in df.items()
}
self.N = n
return self
def load_idf(self, idf: dict[str, float], n: int) -> TFIDFModel:
self.idf = idf
self.N = n
return self
def _score(self, sentence: str) -> float:
tokens = tokenize(sentence)
if not tokens:
return 0.0
tf = Counter(tokens)
return sum(tf[t] / len(tokens) * self.idf.get(t, 1.0) for t in tf)
def summarize(self, sentences: list[str], top_n: int = 1) -> list[str]:
if not sentences:
return [""]
scored = sorted(sentences, key=self._score, reverse=True)
return scored[:top_n]
class LexRankModel:
THRESHOLD = 0.1
DAMPING = 0.85
MAX_ITER = 100
TOL = 1e-6
def __init__(self) -> None:
self.idf: dict[str, float] = {}
def fit(self, corpus: list[str]) -> LexRankModel:
n = len(corpus)
df: dict[str, int] = defaultdict(int)
for sent in corpus:
for term in set(tokenize(sent)):
df[term] += 1
self.idf = {
term: math.log((n + 1) / (freq + 1)) + 1
for term, freq in df.items()
}
return self
def load_idf(self, idf: dict[str, float]) -> LexRankModel:
self.idf = idf
return self
def _tfidf_vec(self, sentence: str) -> dict[str, float]:
tokens = tokenize(sentence)
if not tokens:
return {}
tf = Counter(tokens)
return {t: (tf[t] / len(tokens)) * self.idf.get(t, 1.0) for t in tf}
@staticmethod
def _cosine(a: dict[str, float], b: dict[str, float]) -> float:
common = set(a) & set(b)
if not common:
return 0.0
dot = sum(a[t] * b[t] for t in common)
norm_a = math.sqrt(sum(v ** 2 for v in a.values()))
norm_b = math.sqrt(sum(v ** 2 for v in b.values()))
if norm_a == 0 or norm_b == 0:
return 0.0
return dot / (norm_a * norm_b)
def _pagerank(self, matrix: np.ndarray) -> np.ndarray:
n = len(matrix)
row_sums = matrix.sum(axis=1, keepdims=True)
row_sums[row_sums == 0] = 1
p = matrix / row_sums
scores = np.ones(n) / n
for _ in range(self.MAX_ITER):
new_scores = (1 - self.DAMPING) / n + self.DAMPING * p.T @ scores
if np.abs(new_scores - scores).sum() < self.TOL:
break
scores = new_scores
return scores
def summarize(self, sentences: list[str], top_n: int = 1) -> list[str]:
if len(sentences) == 1:
return sentences[:top_n]
vecs = [self._tfidf_vec(s) for s in sentences]
n = len(sentences)
sim = np.zeros((n, n))
for i in range(n):
for j in range(i + 1, n):
c = self._cosine(vecs[i], vecs[j])
if c >= self.THRESHOLD:
sim[i, j] = sim[j, i] = c
if sim.sum() == 0:
scored = sorted(range(n), key=lambda i: sum(vecs[i].values()), reverse=True)
return [sentences[i] for i in scored[:top_n]]
scores = self._pagerank(sim)
ranked = np.argsort(scores)[::-1]
return [sentences[i] for i in ranked[:top_n]]
class SentenceTransformerModel:
def __init__(self, model_name: str = "all-MiniLM-L6-v2") -> None:
self.model = SentenceTransformer(model_name)
def summarize(self, sentences: list[str], top_n: int = 1) -> list[str]:
if not sentences:
return [""]
embeddings = self.model.encode(sentences, convert_to_numpy=True)
centroid = embeddings.mean(axis=0)
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
norms[norms == 0] = 1
sims = (embeddings / norms) @ (centroid / (np.linalg.norm(centroid) + 1e-9))
ranked = np.argsort(sims)[::-1]
return [sentences[i] for i in ranked[:top_n]]
class CodeT5Model:
MODEL_NAME = "Salesforce/codet5-base-codexglue-sum-java"
VOCAB_REPO = "Salesforce/codet5-base"
_SPECIAL_TOKENS = ("<pad>", "<s>", "</s>", "<unk>", "<mask>")
def __init__(self) -> None:
vocab_file = hf_hub_download(self.VOCAB_REPO, "vocab.json")
merges_file = hf_hub_download(self.VOCAB_REPO, "merges.txt")
self.tokenizer = ByteLevelBPETokenizer(vocab_file, merges_file)
self.model = AutoModelForSeq2SeqLM.from_pretrained(self.MODEL_NAME)
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.model.to(self.device)
self.model.eval()
def _clean(self, text: str) -> str:
for tok in self._SPECIAL_TOKENS:
text = text.replace(tok, " ")
text = re.sub(r"<extra_id_\d+>", " ", text)
return re.sub(r"\s+", " ", text).strip()
def summarize(self, raw_code: str) -> str:
if not raw_code or not raw_code.strip():
return ""
ids = self.tokenizer.encode(raw_code).ids[:256]
input_ids = torch.tensor([ids], device=self.device)
attention = torch.ones_like(input_ids)
with torch.no_grad():
output_ids = self.model.generate(
input_ids=input_ids,
attention_mask=attention,
max_new_tokens=48,
num_beams=4,
early_stopping=True,
no_repeat_ngram_size=3,
)
decoded = self.tokenizer.decode(output_ids[0].tolist(), skip_special_tokens=False)
return self._clean(decoded)
|