| |
| """ |
| Export repository files to CSV datasets grouped by keyword. |
| |
| This script processes all files in repos_filtered directory, groups them by keyword |
| from repos_check_history.csv, and exports to separate CSV files for each keyword. |
| """ |
|
|
| import os |
| import csv |
| import re |
| from pathlib import Path |
| from collections import defaultdict |
| from typing import Dict, List, Tuple, Optional |
| import pandas as pd |
| from tqdm import tqdm |
| import logging |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(levelname)s - %(message)s', |
| handlers=[ |
| logging.FileHandler('export_files_to_csv.log'), |
| logging.StreamHandler() |
| ] |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| REPOS_FILTERED_DIR = Path("/home/weifengsun/tangou1/domain_code/src/workdir/repos_filtered") |
| REPOS_CHECK_HISTORY_CSV = Path("/home/weifengsun/tangou1/domain_code/src/workdir/repos_check_history.csv") |
| OUTPUT_DIR = Path("/home/weifengsun/tangou1/domain_code/src/workdir/dataset_csv") |
| MAX_FILE_SIZE = None |
|
|
| |
| SKIP_DIRS = {'.git', 'node_modules', '__pycache__', '.pytest_cache', '.mypy_cache', |
| 'venv', 'env', '.venv', '.env', 'dist', 'build', '.eggs', '*.egg-info'} |
|
|
| |
| BINARY_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.gif', '.bmp', '.ico', '.svg', |
| '.pdf', '.zip', '.tar', '.gz', '.bz2', '.xz', '.7z', |
| '.exe', '.dll', '.so', '.dylib', '.bin', '.o', '.a', |
| '.pyc', '.pyo', '.pyd', '.class', '.jar', '.war', |
| '.mp3', '.mp4', '.avi', '.mov', '.wav', '.flac', |
| '.db', '.sqlite', '.sqlite3', '.h5', '.hdf5', '.pkl', '.pickle'} |
|
|
| |
| LANGUAGE_MAP = { |
| '.py': 'Python', |
| '.js': 'JavaScript', |
| '.ts': 'TypeScript', |
| '.java': 'Java', |
| '.cpp': 'C++', |
| '.c': 'C', |
| '.cs': 'C#', |
| '.go': 'Go', |
| '.rs': 'Rust', |
| '.rb': 'Ruby', |
| '.php': 'PHP', |
| '.swift': 'Swift', |
| '.kt': 'Kotlin', |
| '.scala': 'Scala', |
| '.r': 'R', |
| '.m': 'MATLAB', |
| '.jl': 'Julia', |
| '.sh': 'Shell', |
| '.bash': 'Bash', |
| '.zsh': 'Zsh', |
| '.sql': 'SQL', |
| '.html': 'HTML', |
| '.css': 'CSS', |
| '.xml': 'XML', |
| '.json': 'JSON', |
| '.yaml': 'YAML', |
| '.yml': 'YAML', |
| '.md': 'Markdown', |
| '.tex': 'LaTeX', |
| '.f90': 'Fortran', |
| '.f': 'Fortran', |
| '.f77': 'Fortran', |
| '.f95': 'Fortran', |
| '.cu': 'CUDA', |
| '.cl': 'OpenCL', |
| '.hs': 'Haskell', |
| '.ml': 'OCaml', |
| '.fs': 'F#', |
| '.vb': 'Visual Basic', |
| '.pl': 'Perl', |
| '.pm': 'Perl', |
| '.lua': 'Lua', |
| '.vim': 'Vim script', |
| '.cmake': 'CMake', |
| '.makefile': 'Makefile', |
| '.dockerfile': 'Dockerfile', |
| } |
|
|
|
|
| def sanitize_keyword(keyword: str) -> str: |
| """Sanitize keyword for use in filename.""" |
| |
| sanitized = re.sub(r'[^\w\s-]', '_', keyword) |
| |
| sanitized = re.sub(r'[\s-]+', '_', sanitized) |
| |
| sanitized = re.sub(r'_+', '_', sanitized) |
| |
| sanitized = sanitized.strip('_') |
| return sanitized |
|
|
|
|
| def load_keyword_mapping() -> Dict[str, str]: |
| """Load keyword mapping from repos_check_history.csv.""" |
| logger.info(f"Loading keyword mapping from {REPOS_CHECK_HISTORY_CSV}") |
| |
| mapping = {} |
| try: |
| |
| chunk_size = 100000 |
| for chunk in pd.read_csv(REPOS_CHECK_HISTORY_CSV, chunksize=chunk_size): |
| for _, row in chunk.iterrows(): |
| full_name = row['full_name'] |
| keyword = row['keyword'] |
| mapping[full_name] = keyword |
| |
| logger.info(f"Loaded {len(mapping)} keyword mappings") |
| return mapping |
| except Exception as e: |
| logger.error(f"Error loading keyword mapping: {e}") |
| raise |
|
|
|
|
| def is_binary_file(file_path: Path) -> bool: |
| """Check if file is binary by extension and content.""" |
| |
| if file_path.suffix.lower() in BINARY_EXTENSIONS: |
| return True |
| |
| |
| for part in file_path.parts: |
| if part in SKIP_DIRS or part.startswith('.'): |
| return True |
| |
| |
| try: |
| with open(file_path, 'rb') as f: |
| chunk = f.read(512) |
| |
| if b'\x00' in chunk: |
| return True |
| |
| try: |
| chunk.decode('utf-8') |
| except UnicodeDecodeError: |
| return True |
| except Exception: |
| return True |
| |
| return False |
|
|
|
|
| def should_skip_file(file_path: Path) -> bool: |
| """Determine if file should be skipped.""" |
| |
| for part in file_path.parts: |
| if part in SKIP_DIRS: |
| return True |
| if part.startswith('.') and part != '.': |
| return True |
| |
| |
| file_name = file_path.name.lower() |
| if file_name.startswith('readme') and file_path.suffix.lower() in {'.md', '.markdown', '.txt'}: |
| return True |
| |
| |
| if is_binary_file(file_path): |
| return True |
| |
| return False |
|
|
|
|
| def get_language(file_path: Path) -> str: |
| """Get programming language from file extension.""" |
| ext = file_path.suffix.lower() |
| return LANGUAGE_MAP.get(ext, 'Unknown') |
|
|
|
|
| def read_file_content(file_path: Path) -> Optional[str]: |
| """Read file content, handling encoding issues.""" |
| try: |
| |
| with open(file_path, 'r', encoding='utf-8') as f: |
| content = f.read() |
| return content |
| except UnicodeDecodeError: |
| |
| encodings = ['latin-1', 'iso-8859-1', 'cp1252'] |
| for encoding in encodings: |
| try: |
| with open(file_path, 'r', encoding=encoding) as f: |
| content = f.read() |
| logger.warning(f"Read {file_path} with {encoding} encoding") |
| return content |
| except (UnicodeDecodeError, LookupError): |
| continue |
| |
| logger.warning(f"Could not decode {file_path}, skipping") |
| return None |
| except Exception as e: |
| logger.error(f"Error reading {file_path}: {e}") |
| return None |
|
|
|
|
| def process_file(file_path: Path, repo_name: str, keyword: str) -> Optional[Dict]: |
| """Process a single file and return its metadata and content.""" |
| if should_skip_file(file_path): |
| return None |
| |
| try: |
| file_size = file_path.stat().st_size |
| |
| |
| repo_dir = REPOS_FILTERED_DIR / repo_name |
| try: |
| relative_path = file_path.relative_to(repo_dir) |
| except ValueError: |
| |
| return None |
| |
| |
| content = read_file_content(file_path) |
| if content is None: |
| return None |
| |
| |
| line_count = content.count('\n') + (1 if content else 0) |
| |
| return { |
| 'keyword': keyword, |
| 'repo_name': repo_name.replace('___', '/'), |
| 'file_path': str(relative_path), |
| 'file_extension': file_path.suffix, |
| 'file_size': file_size, |
| 'line_count': line_count, |
| 'content': content, |
| 'language': get_language(file_path) |
| } |
| except Exception as e: |
| logger.error(f"Error processing {file_path}: {e}") |
| return None |
|
|
|
|
| def process_repo(repo_name: str, keyword_mapping: Dict[str, str]) -> List[Dict]: |
| """Process all files in a repository.""" |
| repo_dir = REPOS_FILTERED_DIR / repo_name |
| |
| if not repo_dir.exists() or not repo_dir.is_dir(): |
| return [] |
| |
| |
| full_name = repo_name.replace('___', '/') |
| keyword = keyword_mapping.get(full_name) |
| |
| if not keyword: |
| logger.debug(f"No keyword found for {full_name}, skipping") |
| return [] |
| |
| results = [] |
| |
| |
| try: |
| for root, dirs, files in os.walk(repo_dir): |
| |
| dirs[:] = [d for d in dirs if d not in SKIP_DIRS and not d.startswith('.')] |
| |
| for file in files: |
| file_path = Path(root) / file |
| result = process_file(file_path, repo_name, keyword) |
| if result: |
| results.append(result) |
| except Exception as e: |
| logger.error(f"Error walking {repo_dir}: {e}") |
| |
| return results |
|
|
|
|
| class CSVWriterManager: |
| """Manager for CSV writers - handles opening, writing, and closing CSV files.""" |
| |
| def __init__(self, output_dir: Path): |
| self.output_dir = output_dir |
| self.writers = {} |
| self.file_counts = defaultdict(int) |
| self.fieldnames = ['keyword', 'repo_name', 'file_path', 'file_extension', |
| 'file_size', 'line_count', 'content', 'language'] |
| |
| def get_writer(self, keyword: str): |
| """Get or create a CSV writer for a keyword.""" |
| if keyword not in self.writers: |
| sanitized_keyword = sanitize_keyword(keyword) |
| output_file = self.output_dir / f"dataset_{sanitized_keyword}.csv" |
| |
| file_handle = open(output_file, 'w', newline='', encoding='utf-8') |
| writer = csv.DictWriter(file_handle, fieldnames=self.fieldnames, quoting=csv.QUOTE_ALL) |
| writer.writeheader() |
| |
| self.writers[keyword] = (file_handle, writer) |
| |
| return self.writers[keyword][1] |
| |
| def write_row(self, keyword: str, row: Dict): |
| """Write a row to the appropriate CSV file.""" |
| writer = self.get_writer(keyword) |
| writer.writerow(row) |
| self.file_counts[keyword] += 1 |
| |
| def close_all(self): |
| """Close all open file handles.""" |
| for keyword, (file_handle, _) in self.writers.items(): |
| file_handle.close() |
| logger.info(f"Closed dataset_{sanitize_keyword(keyword)}.csv with {self.file_counts[keyword]} files") |
| |
| def get_stats(self) -> Tuple[int, int]: |
| """Return (total_keywords, total_files).""" |
| return len(self.writers), sum(self.file_counts.values()) |
|
|
|
|
| def main(): |
| """Main function with streaming write to avoid memory issues.""" |
| logger.info("Starting file export to CSV (streaming mode)") |
| |
| |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
| logger.info(f"Output directory: {OUTPUT_DIR}") |
| |
| |
| keyword_mapping = load_keyword_mapping() |
| |
| |
| logger.info("Scanning repository directories...") |
| repo_dirs = [d.name for d in REPOS_FILTERED_DIR.iterdir() if d.is_dir()] |
| logger.info(f"Found {len(repo_dirs)} repositories") |
| |
| |
| csv_manager = CSVWriterManager(OUTPUT_DIR) |
| |
| |
| logger.info("Processing repositories (streaming mode - writing as we go)...") |
| |
| total_files_processed = 0 |
| repos_processed = 0 |
| repos_with_no_keyword = 0 |
| |
| try: |
| with tqdm(total=len(repo_dirs), desc="Processing repos") as pbar: |
| for repo_name in repo_dirs: |
| |
| full_name = repo_name.replace('___', '/') |
| keyword = keyword_mapping.get(full_name) |
| |
| if not keyword: |
| repos_with_no_keyword += 1 |
| pbar.update(1) |
| continue |
| |
| |
| results = process_repo(repo_name, keyword_mapping) |
| |
| if results: |
| |
| for result in results: |
| csv_manager.write_row(result['keyword'], result) |
| total_files_processed += 1 |
| repos_processed += 1 |
| |
| pbar.update(1) |
| |
| |
| if repos_processed > 0 and repos_processed % 1000 == 0: |
| logger.info(f"Progress: {repos_processed} repos, {total_files_processed} files") |
| |
| finally: |
| |
| csv_manager.close_all() |
| |
| |
| total_keywords, total_files = csv_manager.get_stats() |
| |
| logger.info("=" * 60) |
| logger.info("Export completed!") |
| logger.info(f"Repositories processed: {repos_processed}") |
| logger.info(f"Repositories with no keyword mapping: {repos_with_no_keyword}") |
| logger.info(f"Total keywords: {total_keywords}") |
| logger.info(f"Total files exported: {total_files}") |
| logger.info(f"Output directory: {OUTPUT_DIR}") |
| logger.info("=" * 60) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|
|
|