File size: 7,525 Bytes
c1ef1a3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 | """
扫描 repos_filtered/ 目录,统计仓库元画像
文件数/大小/扩展名分布/工程化信号等
"""
import os
from pathlib import Path
from collections import Counter, defaultdict
from tqdm import tqdm
import json
import statistics
class RepoMetaScan:
def __init__(self, repos_dir, output_dir, top_n=None):
self.repos_dir = Path(repos_dir)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.top_n = top_n
# 工程化文件/目录标识
self.eng_signals = {
'files': [
'Dockerfile', 'docker-compose.yml', 'requirements.txt', 'setup.py',
'pyproject.toml', 'package.json', 'pom.xml', 'Makefile', 'LICENSE',
'CITATION.cff', 'CMakeLists.txt', 'Cargo.toml', 'go.mod', 'go.sum'
],
'dirs': [
'.github/workflows', 'tests', 'test', 'docs', 'doc', 'examples',
'example', 'data', 'notebooks', 'notebook', 'scripts', 'script'
]
}
self.stats = []
def get_repo_full_name(self, dir_name):
"""将目录名转换为full_name (owner___repo -> owner/repo)"""
return dir_name.replace('___', '/')
def scan_repo(self, repo_path):
"""扫描单个仓库"""
repo_name = repo_path.name
full_name = self.get_repo_full_name(repo_name)
stats = {
'repo_name': repo_name,
'full_name': full_name,
'total_files': 0,
'total_size_bytes': 0,
'max_file_size_bytes': 0,
'extensions': Counter(),
'has_ipynb': False,
'ipynb_count': 0,
'eng_signals': defaultdict(bool),
'has_git': os.path.isdir(repo_path / '.git'),
}
# 扫描文件
skip_dirs = {'.git', 'node_modules', 'vendor', 'dist', 'build', '__pycache__', '.pytest_cache'}
for root, dirs, files in os.walk(repo_path):
# 跳过不需要的目录
dirs[:] = [d for d in dirs if d not in skip_dirs]
# 检查工程化目录
rel_path = Path(root).relative_to(repo_path)
for signal_dir in self.eng_signals['dirs']:
if signal_dir in str(rel_path):
stats['eng_signals'][signal_dir] = True
for file in files:
file_path = Path(root) / file
# 文件大小
try:
file_size = file_path.stat().st_size
stats['total_size_bytes'] += file_size
stats['max_file_size_bytes'] = max(stats['max_file_size_bytes'], file_size)
except:
continue
stats['total_files'] += 1
# 扩展名
ext = file_path.suffix.lower()
if not ext:
ext = '<noext>'
stats['extensions'][ext] += 1
# Notebook检测
if ext == '.ipynb':
stats['has_ipynb'] = True
stats['ipynb_count'] += 1
# 工程化文件检测
for signal_file in self.eng_signals['files']:
if file == signal_file:
stats['eng_signals'][signal_file] = True
# 转换extensions为dict
stats['extensions'] = dict(stats['extensions'])
stats['eng_signals'] = dict(stats['eng_signals'])
return stats
def scan_all_repos(self):
"""扫描所有仓库(字典序前top_n,如果top_n为None则扫描所有)"""
print(f"Scanning repos in {self.repos_dir}...")
# 获取所有仓库目录,按字典序排序
all_repos = sorted([d for d in self.repos_dir.iterdir() if d.is_dir()])
if self.top_n is None:
selected_repos = all_repos
print(f"Selected {len(selected_repos)} repos (all repos)")
else:
selected_repos = all_repos[:self.top_n]
print(f"Selected {len(selected_repos)} repos (top {self.top_n} by alphabetical order)")
for repo_path in tqdm(selected_repos, desc="Scanning repos"):
try:
stats = self.scan_repo(repo_path)
self.stats.append(stats)
except Exception as e:
print(f"Error scanning {repo_path}: {e}")
continue
def save_results(self):
"""保存结果"""
import pandas as pd
# 保存仓库级统计(动态文件名)
df = pd.DataFrame(self.stats)
top_n_suffix = f"_top{self.top_n}" if self.top_n else ""
df.to_csv(self.output_dir / f'repo_meta_scan{top_n_suffix}.csv', index=False)
# 汇总统计
summary = {
'total_repos': len(self.stats),
'total_files': sum(s['total_files'] for s in self.stats),
'total_size_gb': sum(s['total_size_bytes'] for s in self.stats) / (1024**3),
'avg_files_per_repo': statistics.mean([s['total_files'] for s in self.stats]) if self.stats else 0,
'avg_size_mb_per_repo': statistics.mean([s['total_size_bytes'] for s in self.stats]) / (1024**2) if self.stats else 0,
'repos_with_ipynb': sum(1 for s in self.stats if s['has_ipynb']),
'total_ipynb_files': sum(s['ipynb_count'] for s in self.stats),
}
# 扩展名Top统计
all_extensions = Counter()
for s in self.stats:
all_extensions.update(s['extensions'])
summary['top_extensions'] = dict(all_extensions.most_common(30))
# 工程化信号统计
eng_counts = defaultdict(int)
for s in self.stats:
for signal, present in s['eng_signals'].items():
if present:
eng_counts[signal] += 1
summary['engineering_signals'] = dict(eng_counts)
with open(self.output_dir / 'repo_meta_summary.json', 'w', encoding='utf-8') as f:
json.dump(summary, f, indent=2, ensure_ascii=False)
# 扩展名分布CSV
ext_df = pd.DataFrame([
{'extension': ext, 'count': count}
for ext, count in all_extensions.most_common(50)
])
ext_df.to_csv(self.output_dir / 'extension_distribution.csv', index=False)
# 工程化信号CSV
eng_df = pd.DataFrame([
{'signal': signal, 'count': count, 'percentage': count / len(self.stats) * 100}
for signal, count in sorted(eng_counts.items(), key=lambda x: -x[1])
])
eng_df.to_csv(self.output_dir / 'engineering_signals.csv', index=False)
def run(self):
"""执行完整流程"""
print("Scanning repository metadata...")
self.scan_all_repos()
print("Saving results...")
self.save_results()
print(f"Repo meta scan complete! Results saved to {self.output_dir}")
if __name__ == "__main__":
repos_dir = "/home/weifengsun/tangou1/domain_code/src/workdir/repos_filtered"
output_dir = "/home/weifengsun/tangou1/domain_code/src/workdir/reporting/repo_meta"
scanner = RepoMetaScan(repos_dir, output_dir, top_n=None) # None表示所有项目
scanner.run()
|