| import csv |
| import re |
| import tokenize |
| from io import StringIO |
| import os |
| from tqdm import tqdm |
| import json |
| import sys |
| from functools import lru_cache |
|
|
| csv.field_size_limit(sys.maxsize) |
|
|
| |
|
|
| |
| _LINE_COMMENT_PATTERNS = { |
| "python": re.compile(r"#(.*)$"), |
| "shell": re.compile(r"#(.*)$"), |
| "r": re.compile(r"#(.*)$"), |
| "matlab": re.compile(r"%(.*)$"), |
| "fortran": re.compile(r"!(.*)$"), |
| "c/c++": re.compile(r"//(.*)$"), |
| "java": re.compile(r"//(.*)$"), |
| "go": re.compile(r"//(.*)$"), |
| "rust": re.compile(r"//(.*)$"), |
| } |
|
|
| |
| _BLOCK_COMMENT_PATTERNS = { |
| "python": re.compile(r'("""[\s\S]*?"""|\'\'\'[\s\S]*?\'\'\')'), |
| "c/c++": re.compile(r"/\*([\s\S]*?)\*/"), |
| "java": re.compile(r"/\*([\s\S]*?)\*/"), |
| "rust": re.compile(r"/\*([\s\S]*?)\*/"), |
| "go": re.compile(r"/\*([\s\S]*?)\*/"), |
| "matlab": re.compile(r"%\{([\s\S]*?)%\}"), |
| } |
|
|
| |
| _FUNCTION_PATTERNS = { |
| "python": re.compile(r"^[ \t]*def\s+(\w+)\s*\(([^)]*)\)", re.MULTILINE), |
| "java": re.compile(r""" |
| (?:public|protected|private|static|final|native|synchronized|abstract|\s)* |
| \s* |
| (?:[\w\<\>\[\],\s]+) |
| \s+ |
| (\w+) |
| \s*\(([^)]*)\) |
| (?:\s*throws\s+[\w,\s]+)? |
| \s*\{ |
| """, re.MULTILINE | re.VERBOSE), |
| "c/c++": re.compile(r""" |
| ^[ \t]* |
| (?!.*typedef) |
| (?!.*\#) |
| (?:[\w\*\s&]+) |
| \b(\w+)\s* |
| \(([^)]*)\) |
| \s*(?:const)? |
| \s*(?:override)? |
| \s*(?:noexcept)? |
| \s*\{ |
| """, re.MULTILINE | re.VERBOSE), |
| "go": re.compile(r"\bfunc\s+(?:\([^)]+\)\s*)?(\w+)\s*\(([^)]*)\)", re.MULTILINE), |
| "rust": re.compile(r"\b(?:pub\s+)?(?:async\s+)?fn\s+(\w+)\s*(?:<[^>]*>)?\s*\(([^)]*)\)", re.MULTILINE), |
| "r": re.compile(r"(\w+)\s*(?:<-|=)\s*function\s*\(([^)]*)\)", re.MULTILINE), |
| "matlab": re.compile(r"^[ \t]*function\s+(?:(?:\[?[\w,\s]*\]?\s*=\s*)?(\w+)|(\w+))\s*\(([^)]*)\)", re.MULTILINE), |
| "shell": re.compile(r"^[ \t]*(?:function\s+)?(\w+)\s*\(\)\s*\{", re.MULTILINE), |
| "fortran": re.compile(r""" |
| (?i) |
| ^[ \t]* |
| (?:recursive\s+)? |
| (?:pure\s+)? |
| (?:elemental\s+)? |
| (?:[\w\*]+(?:\s*\([^)]*\))?\s+)? |
| (function|subroutine)\s+ |
| (\w+)\s* |
| \(([^)]*)\) |
| """, re.MULTILINE | re.VERBOSE), |
| } |
|
|
| |
| _REMOVE_COMMENT_PATTERNS = { |
| "python_line": re.compile(r'#.*$', re.MULTILINE), |
| "python_triple_dq": re.compile(r'"""[\s\S]*?"""'), |
| "python_triple_sq": re.compile(r"'''[\s\S]*?'''"), |
| "c_line": re.compile(r'//.*$', re.MULTILINE), |
| "c_block": re.compile(r'/\*[\s\S]*?\*/'), |
| "shell_line": re.compile(r'#.*$', re.MULTILINE), |
| "matlab_line": re.compile(r'%.*$', re.MULTILINE), |
| "matlab_block": re.compile(r'%\{[\s\S]*?%\}'), |
| "fortran_line": re.compile(r'!.*$', re.MULTILINE), |
| } |
|
|
| def detect_language(file_path: str): |
| """仅根据文件后缀判断语言""" |
|
|
| ext_map = { |
| ".py": "python", |
|
|
| ".java": "java", |
|
|
| ".c": "c/c++", |
| ".h": "c/c++", |
| ".hh": "c/c++", |
| ".hpp": "c/c++", |
| ".cpp": "c/c++", |
| ".cc": "c/c++", |
| ".cxx": "c/c++", |
| ".c++": "c/c++", |
|
|
| ".F": "fortran", |
| ".f90": "fortran", |
| ".f": "fortran", |
| ".f95": "fortran", |
|
|
| ".r": "r", |
|
|
| ".m": "matlab", |
|
|
| ".sh": "shell", |
| ".bash": "shell", |
|
|
| ".rs": "rust", |
| ".go": "go", |
| } |
|
|
| ext = os.path.splitext(file_path)[1].lower() |
| ext = ext.strip() |
|
|
| |
| |
|
|
| return ext_map.get(ext, ext) |
|
|
|
|
| def count_comments(code: str, lang: str): |
| """统计注释行数与注释 token(支持 Python/Java/C++/Fortran/Matlab/R/Shell/Rust/Go/Jupyter) |
| |
| 使用预编译的正则表达式以提高性能。 |
| """ |
| |
| |
| if lang == "jupyter": |
| lang = "python" |
|
|
| comment_lines = 0 |
| comment_tokens = [] |
| lines = code.splitlines() |
| |
| |
| block_comment_line_indices = set() |
|
|
| |
| if lang in _BLOCK_COMMENT_PATTERNS: |
| patt = _BLOCK_COMMENT_PATTERNS[lang] |
| |
| if lang == "python": |
| |
| for match in patt.finditer(code): |
| start_pos = match.start() |
| end_pos = match.end() |
| |
| |
| start_line = code[:start_pos].count('\n') |
| end_line = code[:end_pos].count('\n') |
| |
| |
| prefix = code[max(0, start_pos-20):start_pos].strip() |
| if not prefix.endswith('='): |
| for line_idx in range(start_line, end_line + 1): |
| block_comment_line_indices.add(line_idx) |
| |
| block_content = match.group(1) |
| if block_content.startswith('"""'): |
| block_content = block_content[3:-3] |
| else: |
| block_content = block_content[3:-3] |
| |
| for b in block_content.splitlines(): |
| comment_lines += 1 |
| if b.strip(): |
| comment_tokens.extend(b.strip().split()) |
| else: |
| for match in patt.finditer(code): |
| start_pos = match.start() |
| end_pos = match.end() |
| |
| start_line = code[:start_pos].count('\n') |
| end_line = code[:end_pos].count('\n') |
| |
| for line_idx in range(start_line, end_line + 1): |
| block_comment_line_indices.add(line_idx) |
| |
| block_content = match.group(1) if match.lastindex else match.group(0) |
| for b in block_content.splitlines(): |
| comment_lines += 1 |
| if b.strip(): |
| comment_tokens.extend(b.strip().split()) |
|
|
| |
| if lang in _LINE_COMMENT_PATTERNS: |
| patt = _LINE_COMMENT_PATTERNS[lang] |
| for line_idx, line in enumerate(lines): |
| if line_idx in block_comment_line_indices: |
| continue |
| |
| m = patt.search(line) |
| if m: |
| prefix = line[:m.start()] |
| single_quotes = prefix.count("'") - prefix.count("\\'") |
| double_quotes = prefix.count('"') - prefix.count('\\"') |
| |
| if single_quotes % 2 == 0 and double_quotes % 2 == 0: |
| comment_lines += 1 |
| text = m.group(1) |
| if text: |
| comment_tokens.extend(text.strip().split()) |
|
|
| return comment_lines, len(comment_tokens) |
|
|
|
|
| def count_functions_and_parameters(code: str, lang: str): |
| """统计函数数量与参数数量,支持多语言(含 Fortran subroutine/function)。 |
| |
| 使用预编译的正则表达式以提高性能。 |
| """ |
| |
| |
| if lang == "jupyter": |
| lang = "python" |
|
|
| patt = _FUNCTION_PATTERNS.get(lang) |
| if not patt: |
| return 0, 0 |
|
|
| |
| code_no_comments = _remove_comments(code, lang) |
|
|
| |
| matches = patt.findall(code_no_comments) |
|
|
| function_count = len(matches) |
|
|
| parameter_count = 0 |
| for m in matches: |
| if lang == "fortran": |
| params = m[2] |
| elif lang == "matlab": |
| params = m[2] if len(m) > 2 else "" |
| else: |
| params = m[1] if isinstance(m, tuple) and len(m) > 1 else "" |
|
|
| params = params.strip() if params else "" |
| if params: |
| items = [p.strip() for p in params.split(",") if p.strip()] |
| parameter_count += len(items) |
|
|
| return function_count, parameter_count |
|
|
|
|
| def _remove_comments(code: str, lang: str) -> str: |
| """移除代码中的注释,用于更准确地匹配函数定义(使用预编译正则)""" |
| |
| if lang in ("python", "jupyter"): |
| code = _REMOVE_COMMENT_PATTERNS["python_line"].sub('', code) |
| code = _REMOVE_COMMENT_PATTERNS["python_triple_dq"].sub(lambda m: '\n' * m.group(0).count('\n'), code) |
| code = _REMOVE_COMMENT_PATTERNS["python_triple_sq"].sub(lambda m: '\n' * m.group(0).count('\n'), code) |
| |
| elif lang in ("c/c++", "java", "rust", "go"): |
| code = _REMOVE_COMMENT_PATTERNS["c_line"].sub('', code) |
| code = _REMOVE_COMMENT_PATTERNS["c_block"].sub(lambda m: '\n' * m.group(0).count('\n'), code) |
| |
| elif lang == "shell": |
| code = _REMOVE_COMMENT_PATTERNS["shell_line"].sub('', code) |
| |
| elif lang == "r": |
| code = _REMOVE_COMMENT_PATTERNS["shell_line"].sub('', code) |
| |
| elif lang == "matlab": |
| code = _REMOVE_COMMENT_PATTERNS["matlab_line"].sub('', code) |
| code = _REMOVE_COMMENT_PATTERNS["matlab_block"].sub(lambda m: '\n' * m.group(0).count('\n'), code) |
| |
| elif lang == "fortran": |
| code = _REMOVE_COMMENT_PATTERNS["fortran_line"].sub('', code) |
| |
| return code |
|
|
|
|
| def count_tokens(code: str): |
| """统计 Python token;非 Python 用简单 split""" |
| try: |
| return len(list(tokenize.generate_tokens(StringIO(code).readline))) |
| except: |
| return len(code.split()) |
|
|
|
|
| def analyze_code(code_str, code_path): |
|
|
| lang = detect_language(code_path) |
| |
| |
| |
| |
| lines = code_str.count("\n") + 1 |
| empty_lines = sum(1 for line in code_str.splitlines() if not line.strip()) |
| comment_lines, comment_token_count = count_comments(code_str, lang) |
| functions, parameters = count_functions_and_parameters(code_str, lang) |
| tokens = count_tokens(code_str) |
|
|
| return { |
| "idx": None, |
| "language": lang, |
| "total_lines": lines, |
| "comment_lines": comment_lines, |
| "comment_tokenst": comment_token_count, |
| "empty_lines": empty_lines, |
| "code_lines": lines - empty_lines - comment_lines, |
| "tokens": tokens, |
| "functions": functions, |
| "parameters": parameters, |
| } |
|
|
|
|
| if __name__ == "__main__": |
| input_dir = "/home/weifengsun/tangou1/domain_code/src/datasets/data_merged" |
| output_dir = "/home/weifengsun/tangou1/domain_code/src/datasets/analysis2" |
| for i in range(110, 120): |
| input_filename = f"{i:03}.csv" |
| output_file_name = f"{i:03}.jsonl" |
|
|
| input_path = os.path.join(input_dir, input_filename) |
| output_path = os.path.join(output_dir, output_file_name) |
|
|
| results = [] |
|
|
| with open(input_path, "r", encoding="utf-8", errors="replace") as f: |
| filtered = (line.replace('\0', '') for line in f) |
| reader = csv.DictReader(filtered) |
|
|
| for idx, row in tqdm(enumerate(reader)): |
| code_str = row.get("text") |
| code_path = row.get("repo_path") |
| if not code_path: |
| code_path = row.get("path") |
|
|
| result = analyze_code(code_str, code_path) |
| result["idx"] = f"{i:03}-{idx}" |
| results.append(result) |
|
|
|
|
| with open(output_path, "w", encoding="utf-8") as f: |
| for r in tqdm(results): |
| f.write(json.dumps(r) + "\n") |
|
|