| | """ |
| | 扫描 repos_filtered/ 目录,统计仓库元画像 |
| | 文件数/大小/扩展名分布/工程化信号等 |
| | """ |
| | import os |
| | from pathlib import Path |
| | from collections import Counter, defaultdict |
| | from tqdm import tqdm |
| | import json |
| | import statistics |
| |
|
| |
|
| | class RepoMetaScan: |
| | def __init__(self, repos_dir, output_dir, top_n=None): |
| | self.repos_dir = Path(repos_dir) |
| | self.output_dir = Path(output_dir) |
| | self.output_dir.mkdir(parents=True, exist_ok=True) |
| | self.top_n = top_n |
| | |
| | |
| | self.eng_signals = { |
| | 'files': [ |
| | 'Dockerfile', 'docker-compose.yml', 'requirements.txt', 'setup.py', |
| | 'pyproject.toml', 'package.json', 'pom.xml', 'Makefile', 'LICENSE', |
| | 'CITATION.cff', 'CMakeLists.txt', 'Cargo.toml', 'go.mod', 'go.sum' |
| | ], |
| | 'dirs': [ |
| | '.github/workflows', 'tests', 'test', 'docs', 'doc', 'examples', |
| | 'example', 'data', 'notebooks', 'notebook', 'scripts', 'script' |
| | ] |
| | } |
| | |
| | self.stats = [] |
| | |
| | def get_repo_full_name(self, dir_name): |
| | """将目录名转换为full_name (owner___repo -> owner/repo)""" |
| | return dir_name.replace('___', '/') |
| | |
| | def scan_repo(self, repo_path): |
| | """扫描单个仓库""" |
| | repo_name = repo_path.name |
| | full_name = self.get_repo_full_name(repo_name) |
| | |
| | stats = { |
| | 'repo_name': repo_name, |
| | 'full_name': full_name, |
| | 'total_files': 0, |
| | 'total_size_bytes': 0, |
| | 'max_file_size_bytes': 0, |
| | 'extensions': Counter(), |
| | 'has_ipynb': False, |
| | 'ipynb_count': 0, |
| | 'eng_signals': defaultdict(bool), |
| | 'has_git': os.path.isdir(repo_path / '.git'), |
| | } |
| | |
| | |
| | skip_dirs = {'.git', 'node_modules', 'vendor', 'dist', 'build', '__pycache__', '.pytest_cache'} |
| | |
| | for root, dirs, files in os.walk(repo_path): |
| | |
| | dirs[:] = [d for d in dirs if d not in skip_dirs] |
| | |
| | |
| | rel_path = Path(root).relative_to(repo_path) |
| | for signal_dir in self.eng_signals['dirs']: |
| | if signal_dir in str(rel_path): |
| | stats['eng_signals'][signal_dir] = True |
| | |
| | for file in files: |
| | file_path = Path(root) / file |
| | |
| | |
| | try: |
| | file_size = file_path.stat().st_size |
| | stats['total_size_bytes'] += file_size |
| | stats['max_file_size_bytes'] = max(stats['max_file_size_bytes'], file_size) |
| | except: |
| | continue |
| | |
| | stats['total_files'] += 1 |
| | |
| | |
| | ext = file_path.suffix.lower() |
| | if not ext: |
| | ext = '<noext>' |
| | stats['extensions'][ext] += 1 |
| | |
| | |
| | if ext == '.ipynb': |
| | stats['has_ipynb'] = True |
| | stats['ipynb_count'] += 1 |
| | |
| | |
| | for signal_file in self.eng_signals['files']: |
| | if file == signal_file: |
| | stats['eng_signals'][signal_file] = True |
| | |
| | |
| | stats['extensions'] = dict(stats['extensions']) |
| | stats['eng_signals'] = dict(stats['eng_signals']) |
| | |
| | return stats |
| | |
| | def scan_all_repos(self): |
| | """扫描所有仓库(字典序前top_n,如果top_n为None则扫描所有)""" |
| | print(f"Scanning repos in {self.repos_dir}...") |
| | |
| | |
| | all_repos = sorted([d for d in self.repos_dir.iterdir() if d.is_dir()]) |
| | if self.top_n is None: |
| | selected_repos = all_repos |
| | print(f"Selected {len(selected_repos)} repos (all repos)") |
| | else: |
| | selected_repos = all_repos[:self.top_n] |
| | print(f"Selected {len(selected_repos)} repos (top {self.top_n} by alphabetical order)") |
| | |
| | for repo_path in tqdm(selected_repos, desc="Scanning repos"): |
| | try: |
| | stats = self.scan_repo(repo_path) |
| | self.stats.append(stats) |
| | except Exception as e: |
| | print(f"Error scanning {repo_path}: {e}") |
| | continue |
| | |
| | def save_results(self): |
| | """保存结果""" |
| | import pandas as pd |
| | |
| | |
| | df = pd.DataFrame(self.stats) |
| | top_n_suffix = f"_top{self.top_n}" if self.top_n else "" |
| | df.to_csv(self.output_dir / f'repo_meta_scan{top_n_suffix}.csv', index=False) |
| | |
| | |
| | summary = { |
| | 'total_repos': len(self.stats), |
| | 'total_files': sum(s['total_files'] for s in self.stats), |
| | 'total_size_gb': sum(s['total_size_bytes'] for s in self.stats) / (1024**3), |
| | 'avg_files_per_repo': statistics.mean([s['total_files'] for s in self.stats]) if self.stats else 0, |
| | 'avg_size_mb_per_repo': statistics.mean([s['total_size_bytes'] for s in self.stats]) / (1024**2) if self.stats else 0, |
| | 'repos_with_ipynb': sum(1 for s in self.stats if s['has_ipynb']), |
| | 'total_ipynb_files': sum(s['ipynb_count'] for s in self.stats), |
| | } |
| | |
| | |
| | all_extensions = Counter() |
| | for s in self.stats: |
| | all_extensions.update(s['extensions']) |
| | |
| | summary['top_extensions'] = dict(all_extensions.most_common(30)) |
| | |
| | |
| | eng_counts = defaultdict(int) |
| | for s in self.stats: |
| | for signal, present in s['eng_signals'].items(): |
| | if present: |
| | eng_counts[signal] += 1 |
| | |
| | summary['engineering_signals'] = dict(eng_counts) |
| | |
| | with open(self.output_dir / 'repo_meta_summary.json', 'w', encoding='utf-8') as f: |
| | json.dump(summary, f, indent=2, ensure_ascii=False) |
| | |
| | |
| | ext_df = pd.DataFrame([ |
| | {'extension': ext, 'count': count} |
| | for ext, count in all_extensions.most_common(50) |
| | ]) |
| | ext_df.to_csv(self.output_dir / 'extension_distribution.csv', index=False) |
| | |
| | |
| | eng_df = pd.DataFrame([ |
| | {'signal': signal, 'count': count, 'percentage': count / len(self.stats) * 100} |
| | for signal, count in sorted(eng_counts.items(), key=lambda x: -x[1]) |
| | ]) |
| | eng_df.to_csv(self.output_dir / 'engineering_signals.csv', index=False) |
| | |
| | def run(self): |
| | """执行完整流程""" |
| | print("Scanning repository metadata...") |
| | self.scan_all_repos() |
| | print("Saving results...") |
| | self.save_results() |
| | print(f"Repo meta scan complete! Results saved to {self.output_dir}") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | repos_dir = "/home/weifengsun/tangou1/domain_code/src/workdir/repos_filtered" |
| | output_dir = "/home/weifengsun/tangou1/domain_code/src/workdir/reporting/repo_meta" |
| | scanner = RepoMetaScan(repos_dir, output_dir, top_n=None) |
| | scanner.run() |
| |
|
| |
|