Spaces:
Sleeping
Sleeping
| import time | |
| import logging | |
| from tqdm import tqdm | |
| import numpy as np | |
| from copydetect.utils import (filter_code, highlight_overlap, get_copied_slices, | |
| get_document_fingerprints, find_fingerprint_overlap, | |
| get_token_coverage) | |
| from copydetect import defaults | |
| from dataclasses import dataclass, field | |
| from typing import Optional, List, Dict, ClassVar | |
| import re | |
| class CopydetectConfig: | |
| test_dirs: List[str] = field(default_factory=lambda: []) | |
| ref_dirs: Optional[List[str]] = field(default_factory=lambda: []) | |
| boilerplate_dirs: Optional[List[str]] = field(default_factory=lambda: []) | |
| noise_t: int = defaults.NOISE_THRESHOLD | |
| guarantee_t: int = defaults.GUARANTEE_THRESHOLD | |
| display_t: float = defaults.DISPLAY_THRESHOLD | |
| disable_filtering: bool = False | |
| force_language: Optional[str] = None | |
| truncate: bool = False | |
| silent: bool = False | |
| encoding: str = "utf-8" | |
| window_size: int = field(init=False, default=guarantee_t - noise_t + 1) | |
| short_names: ClassVar[Dict[str, str]] = { | |
| "noise_threshold": "noise_t", | |
| "guarantee_threshold": "guarantee_t", | |
| "display_threshold": "display_t", | |
| "test_directories": "test_dirs", | |
| "reference_directories": "ref_dirs", | |
| "boilerplate_directories": "boilerplate_dirs", | |
| } | |
| def _check_arguments(self): | |
| if not isinstance(self.test_dirs, list): | |
| raise TypeError("Test directories must be a list") | |
| if not isinstance(self.ref_dirs, list): | |
| raise TypeError("Reference directories must be a list") | |
| if not isinstance(self.boilerplate_dirs, list): | |
| raise TypeError("Boilerplate directories must be a list") | |
| if not isinstance(self.disable_filtering, bool): | |
| raise TypeError("disable_filtering must be true or false") | |
| if self.force_language is not None: | |
| if not isinstance(self.force_language, str): | |
| raise TypeError("force_language must be a string") | |
| if not isinstance(self.truncate, bool): | |
| raise TypeError("truncate must be true or false") | |
| if not isinstance(self.noise_t, int): | |
| if int(self.noise_t) == self.noise_t: | |
| self.noise_t = int(self.noise_t) | |
| self.window_size = int(self.window_size) | |
| else: | |
| raise TypeError("Noise threshold must be an integer") | |
| if not isinstance(self.guarantee_t, int): | |
| if int(self.guarantee_t) == self.guarantee_t: | |
| self.guarantee_t = int(self.guarantee_t) | |
| self.window_size = int(self.window_size) | |
| else: | |
| raise TypeError("Guarantee threshold must be an integer") | |
| # value checking | |
| if self.guarantee_t < self.noise_t: | |
| raise ValueError( | |
| "Guarantee threshold must be greater than or " | |
| "equal to noise threshold" | |
| ) | |
| if self.display_t > 1 or self.display_t < 0: | |
| raise ValueError("Display threshold must be between 0 and 1") | |
| class CodeFingerprint: | |
| def __init__(self, file, k, win_size, boilerplate=None, filter=True, encoding: str = "utf-8", force_language="python"): | |
| if boilerplate is None: | |
| boilerplate = [] | |
| if encoding == "DETECT": | |
| try: | |
| import chardet | |
| code = file | |
| detected_encoding = chardet.detect(code)["encoding"] | |
| if detected_encoding is not None: | |
| code = code.decode(detected_encoding) | |
| else: | |
| code = code.decode() | |
| except ModuleNotFoundError as e: | |
| logging.error("encoding detection requires chardet to be installed") | |
| raise e | |
| else: | |
| code = file | |
| if filter: | |
| if force_language=="python": code = self.modify_code(code) | |
| filtered_code, offsets = filter_code(code, None, force_language) | |
| else: | |
| filtered_code, offsets = code, np.array([]) | |
| hashes, idx = get_document_fingerprints(filtered_code, k, win_size, boilerplate) | |
| self.raw_code = code | |
| self.filtered_code = filtered_code | |
| self.offsets = offsets | |
| self.hashes = hashes | |
| self.hash_idx = idx | |
| self.k = k | |
| self.token_coverage = get_token_coverage(idx, k, len(filtered_code)) | |
| def modify_code(self, code): | |
| # Replace "from mod_name import el1, el2, el3, ..." with "import mod_name" | |
| # Collect all unique elements | |
| from_statements = re.findall(r'\bfrom\s+(\w+(?:\.\w+)*)\s+import\s+((?:\w+\s*,\s*)*\w+)\b', code) | |
| unique_elements = set() | |
| for mod_name, elements_str in from_statements: | |
| code = re.sub(rf'\bfrom\s+{mod_name}\s+import\s+((?:\w+\s*,\s*)*\w+)\b', f'import {mod_name}', code) | |
| elements = [e.strip() for e in elements_str.split(',')] | |
| unique_elements.update((mod_name, element) for element in elements) | |
| # Perform replacements | |
| for mod_name, element in unique_elements: | |
| replacement = f'{mod_name}_{element}' | |
| code = re.sub(rf'(?<!\.)\b{re.escape(element)}\b', replacement, code) | |
| # Find and store import statements with aliases | |
| # Replace short_alias. with module_name_ | |
| import_statements = re.findall(r'\bimport\s+(\w+(?:\.\w+)*)\s+as\s+(\w+)', code) | |
| for mod_name, short_alias in import_statements: | |
| replacement = rf'{mod_name}_' | |
| code = re.sub(rf'\b{short_alias}\.', replacement, code) | |
| code = re.sub(rf'\bimport\s+{mod_name}\s+as\s+{short_alias}\b', f'import {mod_name}', code) | |
| # Find and store import statements without aliases | |
| # Replace module_name. with module_name_ | |
| import_statements = re.findall(r'\bimport\s+(\w+(?:\.\w+)*)\s', code) | |
| for mod_name in import_statements: | |
| replacement = rf'{mod_name}_' | |
| code = re.sub(rf'\b{mod_name}\.', replacement, code) | |
| code=code.replace('"', "'") | |
| return code | |
| class CopyDetector: | |
| def __init__(self, test_dirs=None, ref_dirs=None, | |
| boilerplate_dirs=None, | |
| noise_t=defaults.NOISE_THRESHOLD, | |
| guarantee_t=defaults.GUARANTEE_THRESHOLD, | |
| display_t=defaults.DISPLAY_THRESHOLD, | |
| disable_filtering=False, force_language="python", | |
| truncate=False, silent=False, | |
| encoding: str = "utf-8"): | |
| conf_args = locals() | |
| conf_args = { | |
| key: val | |
| for key, val in conf_args.items() | |
| if key != "self" and val is not None | |
| } | |
| self.conf = CopydetectConfig(**conf_args) | |
| self.conf.noise_t=noise_t | |
| self.conf.window_size=guarantee_t-noise_t+1 | |
| self.test_files = self.conf.test_dirs | |
| self.ref_files = self.conf.ref_dirs | |
| self.boilerplate_files = self.conf.boilerplate_dirs | |
| self.similarity_matrix = np.array([]) | |
| self.token_overlap_matrix = np.array([]) | |
| self.slice_matrix = {} | |
| self.file_data = {} | |
| def _get_boilerplate_hashes(self): | |
| boilerplate_hashes = [] | |
| for file in self.boilerplate_files: | |
| try: | |
| fingerprint = CodeFingerprint( | |
| file, | |
| k=self.conf.noise_t, | |
| win_size=1, #?? self.conf.window_size | |
| filter=not self.conf.disable_filtering, | |
| encoding=self.conf.encoding, | |
| force_language=self.conf.force_language | |
| ) | |
| boilerplate_hashes.extend(fingerprint.hashes) | |
| except UnicodeDecodeError: | |
| logging.warning(f"Skipping {file}: file not UTF-8 text") | |
| continue | |
| return np.unique(np.array(boilerplate_hashes)) | |
| def _preprocess_code(self, file_list): | |
| boilerplate_hashes = self._get_boilerplate_hashes() | |
| fid=0 | |
| for code_f in file_list: | |
| try: | |
| self.file_data[fid] = CodeFingerprint( | |
| code_f, self.conf.noise_t, self.conf.window_size, | |
| boilerplate_hashes, not self.conf.disable_filtering, | |
| encoding=self.conf.encoding, force_language=self.conf.force_language) | |
| except UnicodeDecodeError: | |
| logging.warning(f"Skipping {code_f}: file not UTF-8 text") | |
| continue | |
| fid+=1 | |
| def compare_files(self, file1_data, file2_data): | |
| if file1_data.k != file2_data.k: | |
| raise ValueError("Code fingerprints must use the same noise threshold") | |
| idx1, idx2 = find_fingerprint_overlap( | |
| file1_data.hashes, file2_data.hashes, | |
| file1_data.hash_idx, file2_data.hash_idx) | |
| slices1 = get_copied_slices(idx1, file1_data.k) | |
| slices2 = get_copied_slices(idx2, file2_data.k) | |
| if len(slices1[0]) == 0: | |
| return 0, (0,0), (np.array([]), np.array([])) | |
| token_overlap1 = np.sum(slices1[1] - slices1[0]) | |
| token_overlap2 = np.sum(slices2[1] - slices2[0]) | |
| if len(file1_data.filtered_code) > 0: | |
| similarity1 = token_overlap1 / file1_data.token_coverage | |
| else: | |
| similarity1 = 0 | |
| if len(file2_data.filtered_code) > 0: | |
| similarity2 = token_overlap2 / file2_data.token_coverage | |
| else: | |
| similarity2 = 0 | |
| if len(file1_data.offsets) > 0: | |
| slices1 += file1_data.offsets[:,1][np.clip( | |
| np.searchsorted(file1_data.offsets[:,0], slices1), | |
| 0, file1_data.offsets.shape[0] - 1)] | |
| if len(file2_data.offsets) > 0: | |
| slices2 += file2_data.offsets[:,1][np.clip( | |
| np.searchsorted(file2_data.offsets[:,0], slices2), | |
| 0, file2_data.offsets.shape[0] - 1)] | |
| return token_overlap1, (similarity1,similarity2), (slices1,slices2) | |
| def run(self): | |
| start_time = time.time() | |
| if not self.conf.silent: | |
| print(" 0.00: Generating file fingerprints") | |
| self._preprocess_code(self.test_files + self.ref_files) | |
| self.similarity_matrix = np.full( | |
| (len(self.test_files), len(self.ref_files), 2), | |
| -1, | |
| dtype=np.float64, | |
| ) | |
| self.token_overlap_matrix = np.full( | |
| (len(self.test_files), len(self.ref_files)), -1 | |
| ) | |
| self.slice_matrix = {} | |
| if not self.conf.silent: | |
| print(f"{time.time()-start_time:6.2f}: Beginning code comparison") | |
| comparisons = {} | |
| for i, test_f in enumerate( | |
| tqdm(self.test_files, | |
| bar_format= ' {l_bar}{bar}{r_bar}', | |
| disable=self.conf.silent) | |
| ): | |
| for j, ref_f in enumerate(self.ref_files): | |
| overlap, (sim1, sim2), (slices1, slices2) = self.compare_files( | |
| self.file_data[i], self.file_data[j+len(self.test_files)] | |
| ) | |
| comparisons[(i, j)] = (i, j) | |
| if slices1.shape[0] != 0: | |
| self.slice_matrix[(i, j)] = [slices1, slices2] | |
| self.similarity_matrix[i, j] = np.array([sim1, sim2]) | |
| self.token_overlap_matrix[i, j] = overlap | |
| if not self.conf.silent: | |
| print(f"{time.time()-start_time:6.2f}: Code comparison completed") | |
| def get_copied_code_list(self): | |
| if len(self.similarity_matrix) == 0: | |
| logging.error("Cannot generate code list: no files compared") | |
| return [] | |
| x,y = np.where(self.similarity_matrix[:,:,0] > self.conf.display_t) | |
| code_list = [] | |
| file_pairs = set() | |
| for idx in range(len(x)): | |
| test_f = x[idx] | |
| ref_f = y[idx] | |
| if (ref_f, test_f) in file_pairs: | |
| # if comparison is already in report, don't add it again | |
| continue | |
| file_pairs.add((test_f, ref_f)) | |
| test_sim = self.similarity_matrix[x[idx], y[idx], 0] | |
| ref_sim = self.similarity_matrix[x[idx], y[idx], 1] | |
| if (test_f, ref_f) in self.slice_matrix: | |
| slices_test = self.slice_matrix[(test_f, ref_f)][0] | |
| slices_ref = self.slice_matrix[(test_f, ref_f)][1] | |
| else: | |
| slices_test = self.slice_matrix[(ref_f, test_f)][1] | |
| slices_ref = self.slice_matrix[(ref_f, test_f)][0] | |
| if self.conf.truncate: | |
| truncate = 10 | |
| else: | |
| truncate = -1 | |
| hl_code_1, _ = highlight_overlap( | |
| self.file_data[test_f].raw_code, slices_test, | |
| "<font color='red'>", "</font>", | |
| truncate=truncate, escape_html=True) | |
| hl_code_2, _ = highlight_overlap( | |
| self.file_data[ref_f+len(self.test_files)].raw_code, slices_ref, | |
| "<font color='green'>", "</font>", | |
| truncate=truncate, escape_html=True) | |
| overlap = self.token_overlap_matrix[x[idx], y[idx]] | |
| code_list.append([test_sim, ref_sim, test_f, ref_f, | |
| hl_code_1, hl_code_2, overlap]) | |
| code_list.sort(key=lambda x: -x[0]) | |
| return code_list | |
| def infos_title(report_title): | |
| full_name1_extracted, full_name2_extracted, generation_datetime = "", "", "" | |
| pattern = re.compile(r"<b>Student\d:</b>\s*(.*?)\s*\<b>email:</b>") | |
| generation_datetime_pattern = re.compile(r"<b>Report generated at:</b> (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})") | |
| matches = pattern.findall(report_title) | |
| generation_datetime_match = generation_datetime_pattern.search(report_title) | |
| if len(matches) > 0: | |
| full_name1_extracted = matches[0] | |
| if len(matches) > 1: | |
| full_name2_extracted = matches[1] | |
| if generation_datetime_match: | |
| generation_datetime = generation_datetime_match.group(1) | |
| return full_name1_extracted, full_name2_extracted, generation_datetime | |
| def get_notebook_infos(notebook, add_id=False): | |
| # codes=[] | |
| # markdowns=[] | |
| codes = "" | |
| markdowns = "" | |
| #ids_c=[] | |
| #ids_m=[] | |
| errors=False | |
| for id, cell in enumerate(notebook.cells): | |
| if cell.cell_type == 'code': | |
| text=cell["source"] | |
| if len(text)>0: | |
| #codes.append(text) | |
| codes += text | |
| #ids_c.append(id) | |
| if not cell["execution_count"]: | |
| errors=True | |
| if cell.cell_type == 'markdown': | |
| text=cell["source"] | |
| if len(text)>0: | |
| #markdowns.append(text) | |
| markdowns += text | |
| #ids_m.append(id) | |
| # if add_id: | |
| # codes=(codes, ids_c) | |
| # markdowns=(markdowns, ids_m) | |
| return [codes], [markdowns], errors | |
| def compare_notebook(notebook1, notebook2, boiler=[], boiler_m=[], noise_t=5, guarantee_t=9): | |
| codes_n1, markdowns_n1, errors_n1 = get_notebook_infos(notebook1,add_id=True) | |
| codes_n2, markdowns_n2, errors_n2 = get_notebook_infos(notebook2) | |
| test_dirs=codes_n1 #[0] | |
| ref_dirs=codes_n2 | |
| codes_sim=[] | |
| if len(test_dirs)>0 and len(ref_dirs)>0: | |
| boilerplate_dirs=boiler | |
| detector = CopyDetector(test_dirs=test_dirs, boilerplate_dirs=boilerplate_dirs, ref_dirs=ref_dirs, force_language="python", noise_t=noise_t, guarantee_t=guarantee_t, display_t=0.5, silent=True) | |
| detector.run() | |
| sm=detector.similarity_matrix.min(axis=2) | |
| codes_sim=sm.max(axis=1) | |
| test_dirs=markdowns_n1#[0] | |
| ref_dirs=markdowns_n2 | |
| texts_sim=[] | |
| if len(test_dirs)>0 and len(ref_dirs)>0: | |
| boilerplate_dirs=boiler_m | |
| detector_m = CopyDetector(test_dirs=test_dirs, boilerplate_dirs=boilerplate_dirs, ref_dirs=ref_dirs, noise_t=noise_t, guarantee_t=guarantee_t, display_t=0.5, silent=True, disable_filtering=True) | |
| detector_m.run() | |
| sm_m=detector_m.similarity_matrix.min(axis=2) | |
| texts_sim=sm_m.max(axis=1) | |
| lc=list(codes_sim)+list(texts_sim) | |
| li= [0, 1] #codes_n1[1]+markdowns_n1[1] | |
| similarity=dict(zip(li,lc)) | |
| return similarity, errors_n1 | |
| def analyse_notebook(notebook, notebooks_ref, exceptkeys, ignore_code=[], ingnore_text=[]): | |
| plagiarism={} | |
| copiedfrom={} | |
| err=False | |
| for suid, n_ref in notebooks_ref.items(): | |
| if suid not in exceptkeys: | |
| sim, err = compare_notebook(notebook, n_ref['report'], boiler=ignore_code, boiler_m=ingnore_text) | |
| for k in sim: | |
| cplk=plagiarism.get(k, 0) | |
| if sim[k]>=cplk: | |
| plagiarism[k]=sim[k] | |
| copiedfrom[k]=(suid) | |
| return plagiarism, copiedfrom, err | |
| import base64 | |
| custom_css = """ | |
| .student{ | |
| max-width: 100px !important; | |
| } | |
| .button{ | |
| max-width: 350px !important; | |
| } | |
| .htm span .dd, .n, .nn, .nb, .p, .bp{ | |
| color: black !important; | |
| } | |
| .htm .highlight pre{ | |
| color: black !important; | |
| } | |
| .htm span .dd, .fm, .nc, .nf{ | |
| color: blue !important; | |
| } | |
| .htm span .dd, .nd{ | |
| color: magenta !important; | |
| } | |
| .test .scroll-hide::-webkit-scrollbar { | |
| display: initial !important; | |
| width: 12px !important; | |
| background-color: #ddd !important; | |
| } | |
| .test .scroll-hide::-webkit-scrollbar-thumb { | |
| background-color: #6366f1 !important; | |
| } | |
| .test .scroll-hide::-webkit-scrollbar-thumb:hover { | |
| background-color: #6366f199 !important; | |
| cursor: pointer; | |
| } | |
| """ | |
| def plagia_error(rate, students, desc=""): | |
| color="red" if rate>=80 else "orange" if rate>=50 else "green" | |
| message=f"<div style='color: {color}; font-size: 12px;'>{desc}: max similarity rate {rate}%, id:{students}</div>" | |
| return message | |
| def user_html(email, photoUrl, expiresAt): | |
| expireTime = datetime.fromtimestamp(expiresAt) | |
| str_show = f""" | |
| <div> | |
| <div style='width:64; float:left; margin:8px;'> | |
| <img src="{photoUrl}" width="64" height="64"/> | |
| </div> | |
| <div style=' float:left; padding:8px;'> | |
| {email}<br/> | |
| <span id='expire'>Session expires at: {expireTime}</span> | |
| <br/> | |
| <a href="/logout"> | |
| <button | |
| style='border-radius: 8px; font-weight: bold; background-color: #e0e7ff; color: #6366f1; border: none; padding: 5px 10px;'> | |
| Logout | |
| </button></a> | |
| </div>""" | |
| return str_show | |
| def reports_html(reports): | |
| table="""<style> | |
| table {width: 100%;border-collapse: collapse;margin-top: 20px;} | |
| th, td {border: 1px solid #ddd;padding: 8px;text-align: left;} | |
| th {background-color: #AAA; padding: 8px} | |
| </style> | |
| <table> | |
| <thead><tr><th>Students</th><th>Report Uploaded At</th><th>Grading</th></tr></thead> | |
| <tbody> | |
| """ | |
| for i, rep in enumerate(reports): | |
| table+=f"<tr><td>{len(reports)-i}. {rep['students']}</td><td>{rep['date']}{rep.get('down','')}</td><td>{rep.get('grade','Not graded')}</td></tr>" | |
| table+="""</tbody> | |
| </table>""" | |
| return table | |
| def down_html(file_str, file_name): | |
| file_encoded = base64.b64encode(file_str.encode('utf-8')).decode('utf-8') | |
| download_button = f"""<a href="data:application/octet-stream;base64,{file_encoded}" | |
| download="{file_name}"> | |
| <button style="border-radius: 8px; font-weight: bold; | |
| background-color: #e0e7ff; color: #6366f1; border: none; padding: 5px 10px;" > | |
| download</button> | |
| </a>""" | |
| return download_button |