dataset-builder / data1 /reporting /repo_meta_scan.py
DouDou
Upload data1/reporting/repo_meta_scan.py with huggingface_hub
c1ef1a3 verified
"""
扫描 repos_filtered/ 目录,统计仓库元画像
文件数/大小/扩展名分布/工程化信号等
"""
import os
from pathlib import Path
from collections import Counter, defaultdict
from tqdm import tqdm
import json
import statistics
class RepoMetaScan:
def __init__(self, repos_dir, output_dir, top_n=None):
self.repos_dir = Path(repos_dir)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.top_n = top_n
# 工程化文件/目录标识
self.eng_signals = {
'files': [
'Dockerfile', 'docker-compose.yml', 'requirements.txt', 'setup.py',
'pyproject.toml', 'package.json', 'pom.xml', 'Makefile', 'LICENSE',
'CITATION.cff', 'CMakeLists.txt', 'Cargo.toml', 'go.mod', 'go.sum'
],
'dirs': [
'.github/workflows', 'tests', 'test', 'docs', 'doc', 'examples',
'example', 'data', 'notebooks', 'notebook', 'scripts', 'script'
]
}
self.stats = []
def get_repo_full_name(self, dir_name):
"""将目录名转换为full_name (owner___repo -> owner/repo)"""
return dir_name.replace('___', '/')
def scan_repo(self, repo_path):
"""扫描单个仓库"""
repo_name = repo_path.name
full_name = self.get_repo_full_name(repo_name)
stats = {
'repo_name': repo_name,
'full_name': full_name,
'total_files': 0,
'total_size_bytes': 0,
'max_file_size_bytes': 0,
'extensions': Counter(),
'has_ipynb': False,
'ipynb_count': 0,
'eng_signals': defaultdict(bool),
'has_git': os.path.isdir(repo_path / '.git'),
}
# 扫描文件
skip_dirs = {'.git', 'node_modules', 'vendor', 'dist', 'build', '__pycache__', '.pytest_cache'}
for root, dirs, files in os.walk(repo_path):
# 跳过不需要的目录
dirs[:] = [d for d in dirs if d not in skip_dirs]
# 检查工程化目录
rel_path = Path(root).relative_to(repo_path)
for signal_dir in self.eng_signals['dirs']:
if signal_dir in str(rel_path):
stats['eng_signals'][signal_dir] = True
for file in files:
file_path = Path(root) / file
# 文件大小
try:
file_size = file_path.stat().st_size
stats['total_size_bytes'] += file_size
stats['max_file_size_bytes'] = max(stats['max_file_size_bytes'], file_size)
except:
continue
stats['total_files'] += 1
# 扩展名
ext = file_path.suffix.lower()
if not ext:
ext = '<noext>'
stats['extensions'][ext] += 1
# Notebook检测
if ext == '.ipynb':
stats['has_ipynb'] = True
stats['ipynb_count'] += 1
# 工程化文件检测
for signal_file in self.eng_signals['files']:
if file == signal_file:
stats['eng_signals'][signal_file] = True
# 转换extensions为dict
stats['extensions'] = dict(stats['extensions'])
stats['eng_signals'] = dict(stats['eng_signals'])
return stats
def scan_all_repos(self):
"""扫描所有仓库(字典序前top_n,如果top_n为None则扫描所有)"""
print(f"Scanning repos in {self.repos_dir}...")
# 获取所有仓库目录,按字典序排序
all_repos = sorted([d for d in self.repos_dir.iterdir() if d.is_dir()])
if self.top_n is None:
selected_repos = all_repos
print(f"Selected {len(selected_repos)} repos (all repos)")
else:
selected_repos = all_repos[:self.top_n]
print(f"Selected {len(selected_repos)} repos (top {self.top_n} by alphabetical order)")
for repo_path in tqdm(selected_repos, desc="Scanning repos"):
try:
stats = self.scan_repo(repo_path)
self.stats.append(stats)
except Exception as e:
print(f"Error scanning {repo_path}: {e}")
continue
def save_results(self):
"""保存结果"""
import pandas as pd
# 保存仓库级统计(动态文件名)
df = pd.DataFrame(self.stats)
top_n_suffix = f"_top{self.top_n}" if self.top_n else ""
df.to_csv(self.output_dir / f'repo_meta_scan{top_n_suffix}.csv', index=False)
# 汇总统计
summary = {
'total_repos': len(self.stats),
'total_files': sum(s['total_files'] for s in self.stats),
'total_size_gb': sum(s['total_size_bytes'] for s in self.stats) / (1024**3),
'avg_files_per_repo': statistics.mean([s['total_files'] for s in self.stats]) if self.stats else 0,
'avg_size_mb_per_repo': statistics.mean([s['total_size_bytes'] for s in self.stats]) / (1024**2) if self.stats else 0,
'repos_with_ipynb': sum(1 for s in self.stats if s['has_ipynb']),
'total_ipynb_files': sum(s['ipynb_count'] for s in self.stats),
}
# 扩展名Top统计
all_extensions = Counter()
for s in self.stats:
all_extensions.update(s['extensions'])
summary['top_extensions'] = dict(all_extensions.most_common(30))
# 工程化信号统计
eng_counts = defaultdict(int)
for s in self.stats:
for signal, present in s['eng_signals'].items():
if present:
eng_counts[signal] += 1
summary['engineering_signals'] = dict(eng_counts)
with open(self.output_dir / 'repo_meta_summary.json', 'w', encoding='utf-8') as f:
json.dump(summary, f, indent=2, ensure_ascii=False)
# 扩展名分布CSV
ext_df = pd.DataFrame([
{'extension': ext, 'count': count}
for ext, count in all_extensions.most_common(50)
])
ext_df.to_csv(self.output_dir / 'extension_distribution.csv', index=False)
# 工程化信号CSV
eng_df = pd.DataFrame([
{'signal': signal, 'count': count, 'percentage': count / len(self.stats) * 100}
for signal, count in sorted(eng_counts.items(), key=lambda x: -x[1])
])
eng_df.to_csv(self.output_dir / 'engineering_signals.csv', index=False)
def run(self):
"""执行完整流程"""
print("Scanning repository metadata...")
self.scan_all_repos()
print("Saving results...")
self.save_results()
print(f"Repo meta scan complete! Results saved to {self.output_dir}")
if __name__ == "__main__":
repos_dir = "/home/weifengsun/tangou1/domain_code/src/workdir/repos_filtered"
output_dir = "/home/weifengsun/tangou1/domain_code/src/workdir/reporting/repo_meta"
scanner = RepoMetaScan(repos_dir, output_dir, top_n=None) # None表示所有项目
scanner.run()