ustomodsimup / plagitp.py
bendahmane-ustomb's picture
Create plagitp.py
1ccbea9 verified
raw
history blame
19.4 kB
import time
import logging
from tqdm import tqdm
import numpy as np
from copydetect.utils import (filter_code, highlight_overlap, get_copied_slices,
get_document_fingerprints, find_fingerprint_overlap,
get_token_coverage)
from copydetect import defaults
from dataclasses import dataclass, field
from typing import Optional, List, Dict, ClassVar
import re
@dataclass
class CopydetectConfig:
test_dirs: List[str] = field(default_factory=lambda: [])
ref_dirs: Optional[List[str]] = field(default_factory=lambda: [])
boilerplate_dirs: Optional[List[str]] = field(default_factory=lambda: [])
noise_t: int = defaults.NOISE_THRESHOLD
guarantee_t: int = defaults.GUARANTEE_THRESHOLD
display_t: float = defaults.DISPLAY_THRESHOLD
disable_filtering: bool = False
force_language: Optional[str] = None
truncate: bool = False
silent: bool = False
encoding: str = "utf-8"
window_size: int = field(init=False, default=guarantee_t - noise_t + 1)
short_names: ClassVar[Dict[str, str]] = {
"noise_threshold": "noise_t",
"guarantee_threshold": "guarantee_t",
"display_threshold": "display_t",
"test_directories": "test_dirs",
"reference_directories": "ref_dirs",
"boilerplate_directories": "boilerplate_dirs",
}
def _check_arguments(self):
if not isinstance(self.test_dirs, list):
raise TypeError("Test directories must be a list")
if not isinstance(self.ref_dirs, list):
raise TypeError("Reference directories must be a list")
if not isinstance(self.boilerplate_dirs, list):
raise TypeError("Boilerplate directories must be a list")
if not isinstance(self.disable_filtering, bool):
raise TypeError("disable_filtering must be true or false")
if self.force_language is not None:
if not isinstance(self.force_language, str):
raise TypeError("force_language must be a string")
if not isinstance(self.truncate, bool):
raise TypeError("truncate must be true or false")
if not isinstance(self.noise_t, int):
if int(self.noise_t) == self.noise_t:
self.noise_t = int(self.noise_t)
self.window_size = int(self.window_size)
else:
raise TypeError("Noise threshold must be an integer")
if not isinstance(self.guarantee_t, int):
if int(self.guarantee_t) == self.guarantee_t:
self.guarantee_t = int(self.guarantee_t)
self.window_size = int(self.window_size)
else:
raise TypeError("Guarantee threshold must be an integer")
# value checking
if self.guarantee_t < self.noise_t:
raise ValueError(
"Guarantee threshold must be greater than or "
"equal to noise threshold"
)
if self.display_t > 1 or self.display_t < 0:
raise ValueError("Display threshold must be between 0 and 1")
class CodeFingerprint:
def __init__(self, file, k, win_size, boilerplate=None, filter=True, encoding: str = "utf-8", force_language="python"):
if boilerplate is None:
boilerplate = []
if encoding == "DETECT":
try:
import chardet
code = file
detected_encoding = chardet.detect(code)["encoding"]
if detected_encoding is not None:
code = code.decode(detected_encoding)
else:
code = code.decode()
except ModuleNotFoundError as e:
logging.error("encoding detection requires chardet to be installed")
raise e
else:
code = file
if filter:
if force_language=="python": code = self.modify_code(code)
filtered_code, offsets = filter_code(code, None, force_language)
else:
filtered_code, offsets = code, np.array([])
hashes, idx = get_document_fingerprints(filtered_code, k, win_size, boilerplate)
self.raw_code = code
self.filtered_code = filtered_code
self.offsets = offsets
self.hashes = hashes
self.hash_idx = idx
self.k = k
self.token_coverage = get_token_coverage(idx, k, len(filtered_code))
def modify_code(self, code):
# Replace "from mod_name import el1, el2, el3, ..." with "import mod_name"
# Collect all unique elements
from_statements = re.findall(r'\bfrom\s+(\w+(?:\.\w+)*)\s+import\s+((?:\w+\s*,\s*)*\w+)\b', code)
unique_elements = set()
for mod_name, elements_str in from_statements:
code = re.sub(rf'\bfrom\s+{mod_name}\s+import\s+((?:\w+\s*,\s*)*\w+)\b', f'import {mod_name}', code)
elements = [e.strip() for e in elements_str.split(',')]
unique_elements.update((mod_name, element) for element in elements)
# Perform replacements
for mod_name, element in unique_elements:
replacement = f'{mod_name}_{element}'
code = re.sub(rf'(?<!\.)\b{re.escape(element)}\b', replacement, code)
# Find and store import statements with aliases
# Replace short_alias. with module_name_
import_statements = re.findall(r'\bimport\s+(\w+(?:\.\w+)*)\s+as\s+(\w+)', code)
for mod_name, short_alias in import_statements:
replacement = rf'{mod_name}_'
code = re.sub(rf'\b{short_alias}\.', replacement, code)
code = re.sub(rf'\bimport\s+{mod_name}\s+as\s+{short_alias}\b', f'import {mod_name}', code)
# Find and store import statements without aliases
# Replace module_name. with module_name_
import_statements = re.findall(r'\bimport\s+(\w+(?:\.\w+)*)\s', code)
for mod_name in import_statements:
replacement = rf'{mod_name}_'
code = re.sub(rf'\b{mod_name}\.', replacement, code)
code=code.replace('"', "'")
return code
class CopyDetector:
def __init__(self, test_dirs=None, ref_dirs=None,
boilerplate_dirs=None,
noise_t=defaults.NOISE_THRESHOLD,
guarantee_t=defaults.GUARANTEE_THRESHOLD,
display_t=defaults.DISPLAY_THRESHOLD,
disable_filtering=False, force_language="python",
truncate=False, silent=False,
encoding: str = "utf-8"):
conf_args = locals()
conf_args = {
key: val
for key, val in conf_args.items()
if key != "self" and val is not None
}
self.conf = CopydetectConfig(**conf_args)
self.conf.noise_t=noise_t
self.conf.window_size=guarantee_t-noise_t+1
self.test_files = self.conf.test_dirs
self.ref_files = self.conf.ref_dirs
self.boilerplate_files = self.conf.boilerplate_dirs
self.similarity_matrix = np.array([])
self.token_overlap_matrix = np.array([])
self.slice_matrix = {}
self.file_data = {}
def _get_boilerplate_hashes(self):
boilerplate_hashes = []
for file in self.boilerplate_files:
try:
fingerprint = CodeFingerprint(
file,
k=self.conf.noise_t,
win_size=1, #?? self.conf.window_size
filter=not self.conf.disable_filtering,
encoding=self.conf.encoding,
force_language=self.conf.force_language
)
boilerplate_hashes.extend(fingerprint.hashes)
except UnicodeDecodeError:
logging.warning(f"Skipping {file}: file not UTF-8 text")
continue
return np.unique(np.array(boilerplate_hashes))
def _preprocess_code(self, file_list):
boilerplate_hashes = self._get_boilerplate_hashes()
fid=0
for code_f in file_list:
try:
self.file_data[fid] = CodeFingerprint(
code_f, self.conf.noise_t, self.conf.window_size,
boilerplate_hashes, not self.conf.disable_filtering,
encoding=self.conf.encoding, force_language=self.conf.force_language)
except UnicodeDecodeError:
logging.warning(f"Skipping {code_f}: file not UTF-8 text")
continue
fid+=1
def compare_files(self, file1_data, file2_data):
if file1_data.k != file2_data.k:
raise ValueError("Code fingerprints must use the same noise threshold")
idx1, idx2 = find_fingerprint_overlap(
file1_data.hashes, file2_data.hashes,
file1_data.hash_idx, file2_data.hash_idx)
slices1 = get_copied_slices(idx1, file1_data.k)
slices2 = get_copied_slices(idx2, file2_data.k)
if len(slices1[0]) == 0:
return 0, (0,0), (np.array([]), np.array([]))
token_overlap1 = np.sum(slices1[1] - slices1[0])
token_overlap2 = np.sum(slices2[1] - slices2[0])
if len(file1_data.filtered_code) > 0:
similarity1 = token_overlap1 / file1_data.token_coverage
else:
similarity1 = 0
if len(file2_data.filtered_code) > 0:
similarity2 = token_overlap2 / file2_data.token_coverage
else:
similarity2 = 0
if len(file1_data.offsets) > 0:
slices1 += file1_data.offsets[:,1][np.clip(
np.searchsorted(file1_data.offsets[:,0], slices1),
0, file1_data.offsets.shape[0] - 1)]
if len(file2_data.offsets) > 0:
slices2 += file2_data.offsets[:,1][np.clip(
np.searchsorted(file2_data.offsets[:,0], slices2),
0, file2_data.offsets.shape[0] - 1)]
return token_overlap1, (similarity1,similarity2), (slices1,slices2)
def run(self):
start_time = time.time()
if not self.conf.silent:
print(" 0.00: Generating file fingerprints")
self._preprocess_code(self.test_files + self.ref_files)
self.similarity_matrix = np.full(
(len(self.test_files), len(self.ref_files), 2),
-1,
dtype=np.float64,
)
self.token_overlap_matrix = np.full(
(len(self.test_files), len(self.ref_files)), -1
)
self.slice_matrix = {}
if not self.conf.silent:
print(f"{time.time()-start_time:6.2f}: Beginning code comparison")
comparisons = {}
for i, test_f in enumerate(
tqdm(self.test_files,
bar_format= ' {l_bar}{bar}{r_bar}',
disable=self.conf.silent)
):
for j, ref_f in enumerate(self.ref_files):
overlap, (sim1, sim2), (slices1, slices2) = self.compare_files(
self.file_data[i], self.file_data[j+len(self.test_files)]
)
comparisons[(i, j)] = (i, j)
if slices1.shape[0] != 0:
self.slice_matrix[(i, j)] = [slices1, slices2]
self.similarity_matrix[i, j] = np.array([sim1, sim2])
self.token_overlap_matrix[i, j] = overlap
if not self.conf.silent:
print(f"{time.time()-start_time:6.2f}: Code comparison completed")
def get_copied_code_list(self):
if len(self.similarity_matrix) == 0:
logging.error("Cannot generate code list: no files compared")
return []
x,y = np.where(self.similarity_matrix[:,:,0] > self.conf.display_t)
code_list = []
file_pairs = set()
for idx in range(len(x)):
test_f = x[idx]
ref_f = y[idx]
if (ref_f, test_f) in file_pairs:
# if comparison is already in report, don't add it again
continue
file_pairs.add((test_f, ref_f))
test_sim = self.similarity_matrix[x[idx], y[idx], 0]
ref_sim = self.similarity_matrix[x[idx], y[idx], 1]
if (test_f, ref_f) in self.slice_matrix:
slices_test = self.slice_matrix[(test_f, ref_f)][0]
slices_ref = self.slice_matrix[(test_f, ref_f)][1]
else:
slices_test = self.slice_matrix[(ref_f, test_f)][1]
slices_ref = self.slice_matrix[(ref_f, test_f)][0]
if self.conf.truncate:
truncate = 10
else:
truncate = -1
hl_code_1, _ = highlight_overlap(
self.file_data[test_f].raw_code, slices_test,
"<font color='red'>", "</font>",
truncate=truncate, escape_html=True)
hl_code_2, _ = highlight_overlap(
self.file_data[ref_f+len(self.test_files)].raw_code, slices_ref,
"<font color='green'>", "</font>",
truncate=truncate, escape_html=True)
overlap = self.token_overlap_matrix[x[idx], y[idx]]
code_list.append([test_sim, ref_sim, test_f, ref_f,
hl_code_1, hl_code_2, overlap])
code_list.sort(key=lambda x: -x[0])
return code_list
def infos_title(report_title):
full_name1_extracted, full_name2_extracted, generation_datetime = "", "", ""
pattern = re.compile(r"<b>Student\d:</b>\s*(.*?)\s*\<b>email:</b>")
generation_datetime_pattern = re.compile(r"<b>Report generated at:</b> (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})")
matches = pattern.findall(report_title)
generation_datetime_match = generation_datetime_pattern.search(report_title)
if len(matches) > 0:
full_name1_extracted = matches[0]
if len(matches) > 1:
full_name2_extracted = matches[1]
if generation_datetime_match:
generation_datetime = generation_datetime_match.group(1)
return full_name1_extracted, full_name2_extracted, generation_datetime
def get_notebook_infos(notebook, add_id=False):
# codes=[]
# markdowns=[]
codes = ""
markdowns = ""
#ids_c=[]
#ids_m=[]
errors=False
for id, cell in enumerate(notebook.cells):
if cell.cell_type == 'code':
text=cell["source"]
if len(text)>0:
#codes.append(text)
codes += text
#ids_c.append(id)
if not cell["execution_count"]:
errors=True
if cell.cell_type == 'markdown':
text=cell["source"]
if len(text)>0:
#markdowns.append(text)
markdowns += text
#ids_m.append(id)
# if add_id:
# codes=(codes, ids_c)
# markdowns=(markdowns, ids_m)
return [codes], [markdowns], errors
def compare_notebook(notebook1, notebook2, boiler=[], boiler_m=[], noise_t=5, guarantee_t=9):
codes_n1, markdowns_n1, errors_n1 = get_notebook_infos(notebook1,add_id=True)
codes_n2, markdowns_n2, errors_n2 = get_notebook_infos(notebook2)
test_dirs=codes_n1 #[0]
ref_dirs=codes_n2
codes_sim=[]
if len(test_dirs)>0 and len(ref_dirs)>0:
boilerplate_dirs=boiler
detector = CopyDetector(test_dirs=test_dirs, boilerplate_dirs=boilerplate_dirs, ref_dirs=ref_dirs, force_language="python", noise_t=noise_t, guarantee_t=guarantee_t, display_t=0.5, silent=True)
detector.run()
sm=detector.similarity_matrix.min(axis=2)
codes_sim=sm.max(axis=1)
test_dirs=markdowns_n1#[0]
ref_dirs=markdowns_n2
texts_sim=[]
if len(test_dirs)>0 and len(ref_dirs)>0:
boilerplate_dirs=boiler_m
detector_m = CopyDetector(test_dirs=test_dirs, boilerplate_dirs=boilerplate_dirs, ref_dirs=ref_dirs, noise_t=noise_t, guarantee_t=guarantee_t, display_t=0.5, silent=True, disable_filtering=True)
detector_m.run()
sm_m=detector_m.similarity_matrix.min(axis=2)
texts_sim=sm_m.max(axis=1)
lc=list(codes_sim)+list(texts_sim)
li= [0, 1] #codes_n1[1]+markdowns_n1[1]
similarity=dict(zip(li,lc))
return similarity, errors_n1
def analyse_notebook(notebook, notebooks_ref, exceptkeys, ignore_code=[], ingnore_text=[]):
plagiarism={}
copiedfrom={}
err=False
for suid, n_ref in notebooks_ref.items():
if suid not in exceptkeys:
sim, err = compare_notebook(notebook, n_ref['report'], boiler=ignore_code, boiler_m=ingnore_text)
for k in sim:
cplk=plagiarism.get(k, 0)
if sim[k]>=cplk:
plagiarism[k]=sim[k]
copiedfrom[k]=(suid)
return plagiarism, copiedfrom, err
import base64
custom_css = """
.student{
max-width: 100px !important;
}
.button{
max-width: 350px !important;
}
.htm span .dd, .n, .nn, .nb, .p, .bp{
color: black !important;
}
.htm .highlight pre{
color: black !important;
}
.htm span .dd, .fm, .nc, .nf{
color: blue !important;
}
.htm span .dd, .nd{
color: magenta !important;
}
.test .scroll-hide::-webkit-scrollbar {
display: initial !important;
width: 12px !important;
background-color: #ddd !important;
}
.test .scroll-hide::-webkit-scrollbar-thumb {
background-color: #6366f1 !important;
}
.test .scroll-hide::-webkit-scrollbar-thumb:hover {
background-color: #6366f199 !important;
cursor: pointer;
}
"""
def plagia_error(rate, students, desc=""):
color="red" if rate>=80 else "orange" if rate>=50 else "green"
message=f"<div style='color: {color}; font-size: 12px;'>{desc}: max similarity rate {rate}%, id:{students}</div>"
return message
def user_html(email, photoUrl, expiresAt):
expireTime = datetime.fromtimestamp(expiresAt)
str_show = f"""
<div>
<div style='width:64; float:left; margin:8px;'>
<img src="{photoUrl}" width="64" height="64"/>
</div>
<div style=' float:left; padding:8px;'>
{email}<br/>
<span id='expire'>Session expires at: {expireTime}</span> &nbsp;&nbsp;
<br/>
<a href="/logout">
<button
style='border-radius: 8px; font-weight: bold; background-color: #e0e7ff; color: #6366f1; border: none; padding: 5px 10px;'>
Logout
</button></a>
</div>"""
return str_show
def reports_html(reports):
table="""<style>
table {width: 100%;border-collapse: collapse;margin-top: 20px;}
th, td {border: 1px solid #ddd;padding: 8px;text-align: left;}
th {background-color: #AAA; padding: 8px}
</style>
<table>
<thead><tr><th>Students</th><th>Report Uploaded At</th><th>Grading</th></tr></thead>
<tbody>
"""
for i, rep in enumerate(reports):
table+=f"<tr><td>{len(reports)-i}. {rep['students']}</td><td>{rep['date']}{rep.get('down','')}</td><td>{rep.get('grade','Not graded')}</td></tr>"
table+="""</tbody>
</table>"""
return table
def down_html(file_str, file_name):
file_encoded = base64.b64encode(file_str.encode('utf-8')).decode('utf-8')
download_button = f"""<a href="data:application/octet-stream;base64,{file_encoded}"
download="{file_name}">
<button style="border-radius: 8px; font-weight: bold;
background-color: #e0e7ff; color: #6366f1; border: none; padding: 5px 10px;" >
download</button>
</a>"""
return download_button