File size: 7,525 Bytes
c1ef1a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
"""
扫描 repos_filtered/ 目录,统计仓库元画像
文件数/大小/扩展名分布/工程化信号等
"""
import os
from pathlib import Path
from collections import Counter, defaultdict
from tqdm import tqdm
import json
import statistics


class RepoMetaScan:
    def __init__(self, repos_dir, output_dir, top_n=None):
        self.repos_dir = Path(repos_dir)
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.top_n = top_n
        
        # 工程化文件/目录标识
        self.eng_signals = {
            'files': [
                'Dockerfile', 'docker-compose.yml', 'requirements.txt', 'setup.py',
                'pyproject.toml', 'package.json', 'pom.xml', 'Makefile', 'LICENSE',
                'CITATION.cff', 'CMakeLists.txt', 'Cargo.toml', 'go.mod', 'go.sum'
            ],
            'dirs': [
                '.github/workflows', 'tests', 'test', 'docs', 'doc', 'examples',
                'example', 'data', 'notebooks', 'notebook', 'scripts', 'script'
            ]
        }
        
        self.stats = []
    
    def get_repo_full_name(self, dir_name):
        """将目录名转换为full_name (owner___repo -> owner/repo)"""
        return dir_name.replace('___', '/')
    
    def scan_repo(self, repo_path):
        """扫描单个仓库"""
        repo_name = repo_path.name
        full_name = self.get_repo_full_name(repo_name)
        
        stats = {
            'repo_name': repo_name,
            'full_name': full_name,
            'total_files': 0,
            'total_size_bytes': 0,
            'max_file_size_bytes': 0,
            'extensions': Counter(),
            'has_ipynb': False,
            'ipynb_count': 0,
            'eng_signals': defaultdict(bool),
            'has_git': os.path.isdir(repo_path / '.git'),
        }
        
        # 扫描文件
        skip_dirs = {'.git', 'node_modules', 'vendor', 'dist', 'build', '__pycache__', '.pytest_cache'}
        
        for root, dirs, files in os.walk(repo_path):
            # 跳过不需要的目录
            dirs[:] = [d for d in dirs if d not in skip_dirs]
            
            # 检查工程化目录
            rel_path = Path(root).relative_to(repo_path)
            for signal_dir in self.eng_signals['dirs']:
                if signal_dir in str(rel_path):
                    stats['eng_signals'][signal_dir] = True
            
            for file in files:
                file_path = Path(root) / file
                
                # 文件大小
                try:
                    file_size = file_path.stat().st_size
                    stats['total_size_bytes'] += file_size
                    stats['max_file_size_bytes'] = max(stats['max_file_size_bytes'], file_size)
                except:
                    continue
                
                stats['total_files'] += 1
                
                # 扩展名
                ext = file_path.suffix.lower()
                if not ext:
                    ext = '<noext>'
                stats['extensions'][ext] += 1
                
                # Notebook检测
                if ext == '.ipynb':
                    stats['has_ipynb'] = True
                    stats['ipynb_count'] += 1
                
                # 工程化文件检测
                for signal_file in self.eng_signals['files']:
                    if file == signal_file:
                        stats['eng_signals'][signal_file] = True
        
        # 转换extensions为dict
        stats['extensions'] = dict(stats['extensions'])
        stats['eng_signals'] = dict(stats['eng_signals'])
        
        return stats
    
    def scan_all_repos(self):
        """扫描所有仓库(字典序前top_n,如果top_n为None则扫描所有)"""
        print(f"Scanning repos in {self.repos_dir}...")
        
        # 获取所有仓库目录,按字典序排序
        all_repos = sorted([d for d in self.repos_dir.iterdir() if d.is_dir()])
        if self.top_n is None:
            selected_repos = all_repos
            print(f"Selected {len(selected_repos)} repos (all repos)")
        else:
        selected_repos = all_repos[:self.top_n]
        print(f"Selected {len(selected_repos)} repos (top {self.top_n} by alphabetical order)")
        
        for repo_path in tqdm(selected_repos, desc="Scanning repos"):
            try:
                stats = self.scan_repo(repo_path)
                self.stats.append(stats)
            except Exception as e:
                print(f"Error scanning {repo_path}: {e}")
                continue
    
    def save_results(self):
        """保存结果"""
        import pandas as pd
        
        # 保存仓库级统计(动态文件名)
        df = pd.DataFrame(self.stats)
        top_n_suffix = f"_top{self.top_n}" if self.top_n else ""
        df.to_csv(self.output_dir / f'repo_meta_scan{top_n_suffix}.csv', index=False)
        
        # 汇总统计
        summary = {
            'total_repos': len(self.stats),
            'total_files': sum(s['total_files'] for s in self.stats),
            'total_size_gb': sum(s['total_size_bytes'] for s in self.stats) / (1024**3),
            'avg_files_per_repo': statistics.mean([s['total_files'] for s in self.stats]) if self.stats else 0,
            'avg_size_mb_per_repo': statistics.mean([s['total_size_bytes'] for s in self.stats]) / (1024**2) if self.stats else 0,
            'repos_with_ipynb': sum(1 for s in self.stats if s['has_ipynb']),
            'total_ipynb_files': sum(s['ipynb_count'] for s in self.stats),
        }
        
        # 扩展名Top统计
        all_extensions = Counter()
        for s in self.stats:
            all_extensions.update(s['extensions'])
        
        summary['top_extensions'] = dict(all_extensions.most_common(30))
        
        # 工程化信号统计
        eng_counts = defaultdict(int)
        for s in self.stats:
            for signal, present in s['eng_signals'].items():
                if present:
                    eng_counts[signal] += 1
        
        summary['engineering_signals'] = dict(eng_counts)
        
        with open(self.output_dir / 'repo_meta_summary.json', 'w', encoding='utf-8') as f:
            json.dump(summary, f, indent=2, ensure_ascii=False)
        
        # 扩展名分布CSV
        ext_df = pd.DataFrame([
            {'extension': ext, 'count': count}
            for ext, count in all_extensions.most_common(50)
        ])
        ext_df.to_csv(self.output_dir / 'extension_distribution.csv', index=False)
        
        # 工程化信号CSV
        eng_df = pd.DataFrame([
            {'signal': signal, 'count': count, 'percentage': count / len(self.stats) * 100}
            for signal, count in sorted(eng_counts.items(), key=lambda x: -x[1])
        ])
        eng_df.to_csv(self.output_dir / 'engineering_signals.csv', index=False)
    
    def run(self):
        """执行完整流程"""
        print("Scanning repository metadata...")
        self.scan_all_repos()
        print("Saving results...")
        self.save_results()
        print(f"Repo meta scan complete! Results saved to {self.output_dir}")


if __name__ == "__main__":
    repos_dir = "/home/weifengsun/tangou1/domain_code/src/workdir/repos_filtered"
    output_dir = "/home/weifengsun/tangou1/domain_code/src/workdir/reporting/repo_meta"
    scanner = RepoMetaScan(repos_dir, output_dir, top_n=None)  # None表示所有项目
    scanner.run()