bendahmane-ustomb commited on
Commit
3d2c97d
·
1 Parent(s): 79f65d5

Update plagi.py

Browse files
Files changed (1) hide show
  1. plagi.py +2 -442
plagi.py CHANGED
@@ -1,442 +1,2 @@
1
- import time
2
- import logging
3
- from tqdm import tqdm
4
- import numpy as np
5
- from copydetect.utils import (filter_code, highlight_overlap, get_copied_slices,
6
- get_document_fingerprints, find_fingerprint_overlap,
7
- get_token_coverage)
8
- from copydetect import defaults
9
- from dataclasses import dataclass, field
10
- from typing import Optional, List, Dict, ClassVar
11
- import re
12
-
13
-
14
- @dataclass
15
- class CopydetectConfig:
16
- test_dirs: List[str] = field(default_factory=lambda: [])
17
- ref_dirs: Optional[List[str]] = field(default_factory=lambda: [])
18
- boilerplate_dirs: Optional[List[str]] = field(default_factory=lambda: [])
19
- noise_t: int = defaults.NOISE_THRESHOLD
20
- guarantee_t: int = defaults.GUARANTEE_THRESHOLD
21
- display_t: float = defaults.DISPLAY_THRESHOLD
22
- disable_filtering: bool = False
23
- force_language: Optional[str] = None
24
- truncate: bool = False
25
- silent: bool = False
26
- encoding: str = "utf-8"
27
-
28
- window_size: int = field(init=False, default=guarantee_t - noise_t + 1)
29
- short_names: ClassVar[Dict[str, str]] = {
30
- "noise_threshold": "noise_t",
31
- "guarantee_threshold": "guarantee_t",
32
- "display_threshold": "display_t",
33
- "test_directories": "test_dirs",
34
- "reference_directories": "ref_dirs",
35
- "boilerplate_directories": "boilerplate_dirs",
36
- }
37
-
38
- def _check_arguments(self):
39
- if not isinstance(self.test_dirs, list):
40
- raise TypeError("Test directories must be a list")
41
- if not isinstance(self.ref_dirs, list):
42
- raise TypeError("Reference directories must be a list")
43
- if not isinstance(self.boilerplate_dirs, list):
44
- raise TypeError("Boilerplate directories must be a list")
45
- if not isinstance(self.disable_filtering, bool):
46
- raise TypeError("disable_filtering must be true or false")
47
- if self.force_language is not None:
48
- if not isinstance(self.force_language, str):
49
- raise TypeError("force_language must be a string")
50
- if not isinstance(self.truncate, bool):
51
- raise TypeError("truncate must be true or false")
52
- if not isinstance(self.noise_t, int):
53
- if int(self.noise_t) == self.noise_t:
54
- self.noise_t = int(self.noise_t)
55
- self.window_size = int(self.window_size)
56
- else:
57
- raise TypeError("Noise threshold must be an integer")
58
- if not isinstance(self.guarantee_t, int):
59
- if int(self.guarantee_t) == self.guarantee_t:
60
- self.guarantee_t = int(self.guarantee_t)
61
- self.window_size = int(self.window_size)
62
- else:
63
- raise TypeError("Guarantee threshold must be an integer")
64
-
65
- # value checking
66
- if self.guarantee_t < self.noise_t:
67
- raise ValueError(
68
- "Guarantee threshold must be greater than or "
69
- "equal to noise threshold"
70
- )
71
- if self.display_t > 1 or self.display_t < 0:
72
- raise ValueError("Display threshold must be between 0 and 1")
73
-
74
-
75
-
76
- class CodeFingerprint:
77
- def __init__(self, file, k, win_size, boilerplate=None, filter=True, encoding: str = "utf-8", force_language="python"):
78
- if boilerplate is None:
79
- boilerplate = []
80
-
81
- if encoding == "DETECT":
82
- try:
83
- import chardet
84
- code = file
85
- detected_encoding = chardet.detect(code)["encoding"]
86
- if detected_encoding is not None:
87
- code = code.decode(detected_encoding)
88
- else:
89
- code = code.decode()
90
- except ModuleNotFoundError as e:
91
- logging.error("encoding detection requires chardet to be installed")
92
- raise e
93
- else:
94
- code = file
95
-
96
- if filter:
97
- if force_language=="python": code = self.modify_code(code)
98
- filtered_code, offsets = filter_code(code, None, force_language)
99
- else:
100
- filtered_code, offsets = code, np.array([])
101
-
102
- hashes, idx = get_document_fingerprints(filtered_code, k, win_size, boilerplate)
103
-
104
- self.raw_code = code
105
- self.filtered_code = filtered_code
106
- self.offsets = offsets
107
- self.hashes = hashes
108
- self.hash_idx = idx
109
- self.k = k
110
- self.token_coverage = get_token_coverage(idx, k, len(filtered_code))
111
-
112
-
113
- def modify_code(self, code):
114
- # Replace "from mod_name import el1, el2, el3, ..." with "import mod_name"
115
- # Collect all unique elements
116
- from_statements = re.findall(r'\bfrom\s+(\w+(?:\.\w+)*)\s+import\s+((?:\w+\s*,\s*)*\w+)\b', code)
117
- unique_elements = set()
118
- for mod_name, elements_str in from_statements:
119
- code = re.sub(rf'\bfrom\s+{mod_name}\s+import\s+((?:\w+\s*,\s*)*\w+)\b', f'import {mod_name}', code)
120
- elements = [e.strip() for e in elements_str.split(',')]
121
- unique_elements.update((mod_name, element) for element in elements)
122
-
123
- # Perform replacements
124
- for mod_name, element in unique_elements:
125
- replacement = f'{mod_name}_{element}'
126
- code = re.sub(rf'(?<!\.)\b{re.escape(element)}\b', replacement, code)
127
-
128
-
129
- # Find and store import statements with aliases
130
- # Replace short_alias. with module_name_
131
- import_statements = re.findall(r'\bimport\s+(\w+(?:\.\w+)*)\s+as\s+(\w+)', code)
132
- for mod_name, short_alias in import_statements:
133
- replacement = rf'{mod_name}_'
134
- code = re.sub(rf'\b{short_alias}\.', replacement, code)
135
- code = re.sub(rf'\bimport\s+{mod_name}\s+as\s+{short_alias}\b', f'import {mod_name}', code)
136
-
137
-
138
- # Find and store import statements without aliases
139
- # Replace module_name. with module_name_
140
- import_statements = re.findall(r'\bimport\s+(\w+(?:\.\w+)*)\s', code)
141
- for mod_name in import_statements:
142
- replacement = rf'{mod_name}_'
143
- code = re.sub(rf'\b{mod_name}\.', replacement, code)
144
-
145
- code=code.replace('"', "'")
146
-
147
- return code
148
-
149
-
150
-
151
- class CopyDetector:
152
- def __init__(self, test_dirs=None, ref_dirs=None,
153
- boilerplate_dirs=None,
154
- noise_t=defaults.NOISE_THRESHOLD,
155
- guarantee_t=defaults.GUARANTEE_THRESHOLD,
156
- display_t=defaults.DISPLAY_THRESHOLD,
157
- disable_filtering=False, force_language="python",
158
- truncate=False, silent=False,
159
- encoding: str = "utf-8"):
160
- conf_args = locals()
161
- conf_args = {
162
- key: val
163
- for key, val in conf_args.items()
164
- if key != "self" and val is not None
165
- }
166
- self.conf = CopydetectConfig(**conf_args)
167
- self.conf.noise_t=noise_t
168
- self.conf.window_size=guarantee_t-noise_t+1
169
-
170
- self.test_files = self.conf.test_dirs
171
- self.ref_files = self.conf.ref_dirs
172
- self.boilerplate_files = self.conf.boilerplate_dirs
173
-
174
- self.similarity_matrix = np.array([])
175
- self.token_overlap_matrix = np.array([])
176
- self.slice_matrix = {}
177
- self.file_data = {}
178
-
179
-
180
- def _get_boilerplate_hashes(self):
181
- boilerplate_hashes = []
182
- for file in self.boilerplate_files:
183
- try:
184
- fingerprint = CodeFingerprint(
185
- file,
186
- k=self.conf.noise_t,
187
- win_size=1, #?? self.conf.window_size
188
- filter=not self.conf.disable_filtering,
189
- encoding=self.conf.encoding,
190
- force_language=self.conf.force_language
191
- )
192
- boilerplate_hashes.extend(fingerprint.hashes)
193
- except UnicodeDecodeError:
194
- logging.warning(f"Skipping {file}: file not UTF-8 text")
195
- continue
196
-
197
- return np.unique(np.array(boilerplate_hashes))
198
-
199
- def _preprocess_code(self, file_list):
200
- boilerplate_hashes = self._get_boilerplate_hashes()
201
- fid=0
202
- for code_f in file_list:
203
- try:
204
- self.file_data[fid] = CodeFingerprint(
205
- code_f, self.conf.noise_t, self.conf.window_size,
206
- boilerplate_hashes, not self.conf.disable_filtering,
207
- encoding=self.conf.encoding, force_language=self.conf.force_language)
208
- except UnicodeDecodeError:
209
- logging.warning(f"Skipping {code_f}: file not UTF-8 text")
210
- continue
211
- fid+=1
212
-
213
- def compare_files(self, file1_data, file2_data):
214
- if file1_data.k != file2_data.k:
215
- raise ValueError("Code fingerprints must use the same noise threshold")
216
- idx1, idx2 = find_fingerprint_overlap(
217
- file1_data.hashes, file2_data.hashes,
218
- file1_data.hash_idx, file2_data.hash_idx)
219
- slices1 = get_copied_slices(idx1, file1_data.k)
220
- slices2 = get_copied_slices(idx2, file2_data.k)
221
- if len(slices1[0]) == 0:
222
- return 0, (0,0), (np.array([]), np.array([]))
223
-
224
- token_overlap1 = np.sum(slices1[1] - slices1[0])
225
- token_overlap2 = np.sum(slices2[1] - slices2[0])
226
-
227
- if len(file1_data.filtered_code) > 0:
228
- similarity1 = token_overlap1 / file1_data.token_coverage
229
- else:
230
- similarity1 = 0
231
- if len(file2_data.filtered_code) > 0:
232
- similarity2 = token_overlap2 / file2_data.token_coverage
233
- else:
234
- similarity2 = 0
235
-
236
- if len(file1_data.offsets) > 0:
237
- slices1 += file1_data.offsets[:,1][np.clip(
238
- np.searchsorted(file1_data.offsets[:,0], slices1),
239
- 0, file1_data.offsets.shape[0] - 1)]
240
- if len(file2_data.offsets) > 0:
241
- slices2 += file2_data.offsets[:,1][np.clip(
242
- np.searchsorted(file2_data.offsets[:,0], slices2),
243
- 0, file2_data.offsets.shape[0] - 1)]
244
-
245
- return token_overlap1, (similarity1,similarity2), (slices1,slices2)
246
-
247
- def run(self):
248
- start_time = time.time()
249
- if not self.conf.silent:
250
- print(" 0.00: Generating file fingerprints")
251
- self._preprocess_code(self.test_files + self.ref_files)
252
-
253
- self.similarity_matrix = np.full(
254
- (len(self.test_files), len(self.ref_files), 2),
255
- -1,
256
- dtype=np.float64,
257
- )
258
- self.token_overlap_matrix = np.full(
259
- (len(self.test_files), len(self.ref_files)), -1
260
- )
261
- self.slice_matrix = {}
262
-
263
- if not self.conf.silent:
264
- print(f"{time.time()-start_time:6.2f}: Beginning code comparison")
265
-
266
-
267
- comparisons = {}
268
-
269
- for i, test_f in enumerate(
270
- tqdm(self.test_files,
271
- bar_format= ' {l_bar}{bar}{r_bar}',
272
- disable=self.conf.silent)
273
- ):
274
- for j, ref_f in enumerate(self.ref_files):
275
- overlap, (sim1, sim2), (slices1, slices2) = self.compare_files(
276
- self.file_data[i], self.file_data[j+len(self.test_files)]
277
- )
278
- comparisons[(i, j)] = (i, j)
279
- if slices1.shape[0] != 0:
280
- self.slice_matrix[(i, j)] = [slices1, slices2]
281
-
282
- self.similarity_matrix[i, j] = np.array([sim1, sim2])
283
- self.token_overlap_matrix[i, j] = overlap
284
-
285
- if not self.conf.silent:
286
- print(f"{time.time()-start_time:6.2f}: Code comparison completed")
287
-
288
-
289
- def get_copied_code_list(self):
290
- if len(self.similarity_matrix) == 0:
291
- logging.error("Cannot generate code list: no files compared")
292
- return []
293
- x,y = np.where(self.similarity_matrix[:,:,0] > self.conf.display_t)
294
-
295
- code_list = []
296
- file_pairs = set()
297
- for idx in range(len(x)):
298
- test_f = x[idx]
299
- ref_f = y[idx]
300
- if (ref_f, test_f) in file_pairs:
301
- # if comparison is already in report, don't add it again
302
- continue
303
- file_pairs.add((test_f, ref_f))
304
-
305
- test_sim = self.similarity_matrix[x[idx], y[idx], 0]
306
- ref_sim = self.similarity_matrix[x[idx], y[idx], 1]
307
- if (test_f, ref_f) in self.slice_matrix:
308
- slices_test = self.slice_matrix[(test_f, ref_f)][0]
309
- slices_ref = self.slice_matrix[(test_f, ref_f)][1]
310
- else:
311
- slices_test = self.slice_matrix[(ref_f, test_f)][1]
312
- slices_ref = self.slice_matrix[(ref_f, test_f)][0]
313
-
314
- if self.conf.truncate:
315
- truncate = 10
316
- else:
317
- truncate = -1
318
-
319
- hl_code_1, _ = highlight_overlap(
320
- self.file_data[test_f].raw_code, slices_test,
321
- "<font color='red'>", "</font>",
322
- truncate=truncate, escape_html=True)
323
- hl_code_2, _ = highlight_overlap(
324
- self.file_data[ref_f+len(self.test_files)].raw_code, slices_ref,
325
- "<font color='green'>", "</font>",
326
- truncate=truncate, escape_html=True)
327
- overlap = self.token_overlap_matrix[x[idx], y[idx]]
328
-
329
- code_list.append([test_sim, ref_sim, test_f, ref_f,
330
- hl_code_1, hl_code_2, overlap])
331
-
332
- code_list.sort(key=lambda x: -x[0])
333
- return code_list
334
-
335
-
336
-
337
-
338
- def infos_title(report_title):
339
- full_name1_extracted, full_name2_extracted, generation_datetime = "", "", ""
340
- pattern = re.compile(r"<b>Student\d:</b>\s*(.*?)\s*\<b>email:</b>")
341
- generation_datetime_pattern = re.compile(r"<b>Report generated at:</b> (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})")
342
-
343
- matches = pattern.findall(report_title)
344
- generation_datetime_match = generation_datetime_pattern.search(report_title)
345
-
346
- if len(matches) > 0:
347
- full_name1_extracted = matches[0]
348
-
349
- if len(matches) > 1:
350
- full_name2_extracted = matches[1]
351
-
352
- if generation_datetime_match:
353
- generation_datetime = generation_datetime_match.group(1)
354
-
355
- return full_name1_extracted, full_name2_extracted, generation_datetime
356
-
357
-
358
- def get_notebook_infos(notebook, add_id=False):
359
- codes=[]
360
- markdowns=[]
361
- ids_c=[]
362
- ids_m=[]
363
- student1, student2, date = "", "", ""
364
- errors=False
365
- for id, cell in enumerate(notebook.cells):
366
- if cell.cell_type == 'code':
367
- text=cell["source"]
368
- if "#checked_cell" in text:
369
- text=text.replace("#checked_cell","").strip()
370
- if len(text)>0:
371
- codes.append(text)
372
- ids_c.append(id)
373
- if not cell["execution_count"]:
374
- errors=True
375
-
376
- if cell.cell_type == 'markdown':
377
- text=cell["source"]
378
- if len(student1)==0 and len(student2)==0 and len(date)==0:
379
- student1, student2, date = infos_title(text)
380
- if "#checked_cell" in text:
381
- text=text.replace("<br/><span style='color:#CCC'>#checked_cell</span>","").strip()
382
- if len(text)>0:
383
- markdowns.append(text)
384
- ids_m.append(id)
385
-
386
- students=""
387
- if len(student1)>0:
388
- students+=student1
389
- if len(student2)>0: students+=" & "
390
- if len(student2)>0: students+=student2
391
-
392
- if add_id:
393
- codes=(codes, ids_c)
394
- markdowns=(markdowns, ids_m)
395
-
396
- return codes, markdowns, students, date, errors
397
-
398
-
399
- def compare_notebook(notebook1, notebook2, boiler=[], boiler_m=[], noise_t=5, guarantee_t=9):
400
- codes_n1, markdowns_n1, students_n1, date_n1, errors_n1 = get_notebook_infos(notebook1,add_id=True)
401
- codes_n2, markdowns_n2, students_n2, date_n2, errors_n2 = get_notebook_infos(notebook2)
402
-
403
- test_dirs=codes_n1[0]
404
- ref_dirs=codes_n2
405
- codes_sim=[]
406
-
407
- if len(test_dirs)>0 and len(ref_dirs)>0:
408
- boilerplate_dirs=boiler
409
- detector = CopyDetector(test_dirs=test_dirs, boilerplate_dirs=boilerplate_dirs, ref_dirs=ref_dirs, force_language="python", noise_t=noise_t, guarantee_t=guarantee_t, display_t=0.5, silent=True)
410
- detector.run()
411
- sm=detector.similarity_matrix.min(axis=2)
412
- codes_sim=sm.max(axis=1)
413
-
414
-
415
- test_dirs=markdowns_n1[0]
416
- ref_dirs=markdowns_n2
417
- texts_sim=[]
418
-
419
- if len(test_dirs)>0 and len(ref_dirs)>0:
420
- boilerplate_dirs=boiler_m
421
- detector_m = CopyDetector(test_dirs=test_dirs, boilerplate_dirs=boilerplate_dirs, ref_dirs=ref_dirs, noise_t=noise_t, guarantee_t=guarantee_t, display_t=0.5, silent=True, disable_filtering=True)
422
- detector_m.run()
423
- sm_m=detector_m.similarity_matrix.min(axis=2)
424
- texts_sim=sm_m.max(axis=1)
425
-
426
- lc=list(codes_sim)+list(texts_sim)
427
- li=codes_n1[1]+markdowns_n1[1]
428
- similarity=dict(zip(li,lc))
429
- return similarity, students_n2, date_n2, errors_n1
430
-
431
-
432
- def analyse_notebook(notebook, notebooks_ref, ignore_code=[], ingnore_text=[]):
433
- plagiarism={}
434
- copiedfrom={}
435
- for suid, n_ref in notebooks_ref.items():
436
- sim, students, date, err = compare_notebook(notebook, n_ref, boiler=ignore_code, boiler_m=ingnore_text)
437
- for k in sim:
438
- cplk=plagiarism.get(k, 0)
439
- if sim[k]>=cplk:
440
- plagiarism[k]=sim[k]
441
- copiedfrom[k]=(students, date, suid)
442
- return plagiarism, copiedfrom, err
 
1
+ import os
2
+ exec(os.getenv("plagi"))