import json import glob import sys from collections import defaultdict from statistics import mean, variance TARGET_LANGS = { "python", "java", "c/c++", "fortran", "r", "matlab", "shell", "rust", "go", } # ------------------------ # 读取全部 JSONL 数据 # ------------------------ def load_jsonl_data(pattern="*.jsonl"): JSONL_FILES = glob.glob(pattern) # print("=====") # print(JSONL_FILES) language_count = defaultdict(int) field_data = defaultdict(list) # 保存各字段所有值 field_data_by_lang = defaultdict(lambda: defaultdict(list)) for filename in JSONL_FILES: with open(filename, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue obj = json.loads(line) lang = obj.get("language", "unknown") language_count[lang] += 1 # 记录所有字段的值 for k, v in obj.items(): if isinstance(v, (int, float)): field_data[k].append(v) field_data_by_lang[lang][k].append(v) return language_count, field_data, field_data_by_lang # ------------------------ # 打印语言分布 # ------------------------ def print_language_distribution(language_count): print("\n========== 语言分布(language counts & percentage) ==========") total_items = sum(language_count.values()) for lang, count in sorted(language_count.items(), key=lambda x: -x[1]): pct = count / total_items * 100 print(f"{lang}: {count} ({pct:.2f}%)") # ------------------------ # 区间统计(通用) # ------------------------ def compute_bins(start, end, step): bins = list(range(start, end + step, step)) labels = [f"{bins[i]}-{bins[i+1]}" for i in range(len(bins) - 1)] labels.append(f"{end}+") return bins, labels def compute_distribution(values, bins, labels): dist = {label: 0 for label in labels} for v in values: placed = False for i in range(len(bins) - 1): if bins[i] <= v < bins[i + 1]: dist[labels[i]] += 1 placed = True break if not placed: dist[labels[-1]] += 1 return dist def print_distribution(title, dist, total_count): print(f"{title}") for label, count in dist.items(): pct = count / total_count * 100 print(f" {label}: {count} ({pct:.2f}%)") # ------------------------ # 统计某字段的分布 # ------------------------ def analyze_field_distribution(jsonl_dir, field, start, end, step): print(f"\n================= 分析字段:{field} =================") # 加载数据 language_count, field_data, field_data_by_lang = load_jsonl_data(jsonl_dir) # 打印语言分布 print_language_distribution(language_count) # 检查字段是否存在 if field not in field_data: print(f"\n字段 '{field}' 在数据中不存在!") return values = [] for lang in TARGET_LANGS: values.extend(field_data_by_lang.get(lang, {}).get(field, [])) print(f"\n========== {field} 整体统计 ==========") print(f"个数: {len(values)}") print(f"最小值: {min(values)}") print(f"最大值: {max(values)}") print(f"均值: {mean(values):.2f}") if len(values) >= 2: print(f"方差: {variance(values):.2f}") else: print("方差: N/A") # 计算区间 bins, labels = compute_bins(start, end, step) # 整体区间分布 overall_dist = compute_distribution(values, bins, labels) print_distribution(f"区间分布", overall_dist, len(values)) # -------- 按语言统计 -------- print(f"\n========== 按语言统计 {field} ==========") for lang in TARGET_LANGS: fields = field_data_by_lang.get(lang) if not fields or field not in fields: continue vals = fields[field] print(f"\n--- {lang} ---") print(f"数量: {len(vals)}") print(f"最小值: {min(vals)}") print(f"最大值: {max(vals)}") print(f"均值: {mean(vals):.2f}") if len(vals) >= 2: print(f"方差: {variance(vals):.2f}") else: print("方差: N/A") # 语言级区间分布 dist = compute_distribution(vals, bins, labels) print_distribution("区间分布:", dist, len(vals)) # ------------------------ if __name__ == "__main__": jsonl_dir = "/home/weifengsun/tangou1/domain_code/src/datasets/analysis2/*.jsonl" # jsonl_dir = "data/*.jsonl" # total_lines, comment_lines, comment_tokenst, empty_lines, code_lines, tokens, functions, parameters field = "comment_lines" start = 0 end = 200 step = 20 analyze_field_distribution(jsonl_dir, field, start, end, step)