File size: 7,216 Bytes
5c9df83 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 | import traceback
import os
import json
from concurrent.futures import ProcessPoolExecutor, as_completed
from tree_sitter import Language, Parser
import tree_sitter_c
import tree_sitter_cpp
import tree_sitter_java
import tree_sitter_go
import tree_sitter_rust
import tree_sitter_julia
import tree_sitter_python
########################################
# 配置区
########################################
OUTPUT_DIR = "/home/weifengsun/tangou1/step2/step22/dataset_calls"
SCORE_THRESHOLD = 0.92
EXCLUDE_DIRS = {
".git", "node_modules", "vendor", "third_party",
"build", "dist", "target", "__pycache__"
}
LANGUAGE_CONFIG = {
"python": {
"ext": [".py"],
"function_nodes": ["function_definition"],
"call_nodes": ["call"],
"name_field": "name",
},
"c": {
"ext": [".c", ".h"],
"function_nodes": ["function_definition"],
"call_nodes": ["call_expression"],
"name_field": "declarator",
},
"cpp": {
"ext": [".cpp", ".cc", ".cxx", ".hpp", ".hh"],
"function_nodes": ["function_definition"],
"call_nodes": ["call_expression"],
"name_field": "declarator",
},
"java": {
"ext": [".java"],
"function_nodes": ["method_declaration", "constructor_declaration"],
"call_nodes": ["method_invocation", "call_expression"],
"name_field": "name",
},
"go": {
"ext": [".go"],
"function_nodes": ["function_declaration", "method_declaration"],
"call_nodes": ["call_expression"],
"name_field": "name",
},
"rust": {
"ext": [".rs"],
"function_nodes": ["function_item"],
"call_nodes": ["call_expression"],
"name_field": "name",
},
"julia": {
"ext": [".jl"],
"function_nodes": ["function_definition"],
"call_nodes": ["call_expression"],
"name_field": "name",
},
}
EXT_TO_LANG = {}
for lang, cfg in LANGUAGE_CONFIG.items():
for e in cfg["ext"]:
EXT_TO_LANG[e] = lang
########################################
# worker 初始化
########################################
LANGUAGES = {
"python": Language(tree_sitter_python.language()),
"c": Language(tree_sitter_c.language()),
"cpp": Language(tree_sitter_cpp.language()),
"java": Language(tree_sitter_java.language()),
"go": Language(tree_sitter_go.language()),
"rust": Language(tree_sitter_rust.language()),
"julia": Language(tree_sitter_julia.language()),
}
def init_worker():
global PARSERS
PARSERS = {}
for lang in LANGUAGE_CONFIG:
try:
parser = Parser()
parser.set_language(LANGUAGES[lang])
PARSERS[lang] = parser
except Exception:
print(f"Failed to load parser for {lang}")
print(traceback.format_exc())
pass
########################################
# jsonl 读取
########################################
def load_high_score_functions(jsonl_path, score_threshold=SCORE_THRESHOLD):
funcs = []
names = set()
with open(jsonl_path, "r", encoding="utf-8") as f:
for line in f:
obj = json.loads(line)
if obj.get("score", 0) >= score_threshold:
funcs.append(obj)
names.add(obj["name"])
return funcs, names
########################################
# Tree-sitter 提取调用
########################################
def extract_calls_from_node(node, src, call_types):
calls = set()
def walk(n):
if n.type in call_types:
for c in n.children:
if c.type == "identifier":
calls.add(src[c.start_byte:c.end_byte].decode())
for c in n.children:
walk(c)
walk(node)
return calls
def find_function_node_by_line(tree, start_line, func_nodes):
result = None
def walk(node):
nonlocal result
if result is not None:
return
if node.type in func_nodes and node.start_point[0] + 1 == start_line:
result = node
return
for c in node.children:
walk(c)
walk(tree.root_node)
return result
########################################
# 匹配函数名
########################################
def match_names(called_names, indexed_names):
matches = {}
for call in called_names:
hits = [name for name in indexed_names if call == name or name.startswith(call) or call.startswith(name)]
if hits:
matches[call] = hits
return matches
########################################
# 单项目分析
########################################
def process_project_calls(project_path):
project_name = os.path.basename(project_path.rstrip("/"))
input_jsonl = os.path.join(project_path, "functions.jsonl") # 假设每项目jsonl在项目目录下
output_path = os.path.join(OUTPUT_DIR, project_name, "calls.jsonl")
os.makedirs(os.path.dirname(output_path), exist_ok=True)
funcs, name_set = load_high_score_functions(input_jsonl)
with open(output_path, "w", encoding="utf-8") as out:
for f in funcs:
file_path = f["file"]
lang = f["language"]
if not lang or lang not in PARSERS:
continue
try:
src = open(file_path, "rb").read()
parser = PARSERS[lang]
tree = parser.parse(src)
node = find_function_node_by_line(tree, f["start_line"], LANGUAGE_CONFIG[lang]["function_nodes"])
if not node:
continue
calls = extract_calls_from_node(node, src, LANGUAGE_CONFIG[lang]["call_nodes"])
matched = match_names(calls, name_set)
out.write(json.dumps({
"function": f["name"],
"qualified_name": f.get("qualified_name", f["name"]),
"calls": sorted(list(calls)),
"matched": matched,
"file": f["file"],
"start_line": f["start_line"],
"end_line": f["end_line"],
"score": f.get("score", 0),
"language": lang
}, ensure_ascii=False) + "\n")
except Exception:
continue
return project_name
########################################
# 主入口
########################################
def load_projects(root):
return [os.path.join(root, d) for d in os.listdir(root) if os.path.isdir(os.path.join(root, d))]
def main():
projects_root = "/home/weifengsun/tangou1/domain_code/src/workdir/repos_filtered"
os.makedirs(OUTPUT_DIR, exist_ok=True)
projects = load_projects(projects_root)
with ProcessPoolExecutor(max_workers=min(os.cpu_count(), 32), initializer=init_worker) as pool:
futures = {pool.submit(process_project_calls, p): p for p in projects}
for f in as_completed(futures):
proj = futures[f]
try:
f.result()
print(f"[OK] {proj}")
except Exception as e:
print(f"[FAIL] {proj}: {e}")
if __name__ == "__main__":
main()
|