File size: 5,202 Bytes
ec4aa90 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
"""File handling utilities for repository processing."""
import zipfile
import os
from pathlib import Path
from typing import List, Set
import shutil
class FileHandler:
"""Handles file extraction and code file discovery."""
# Supported code file extensions
CODE_EXTENSIONS: Set[str] = {
'.py', '.java', '.js', '.ts', '.jsx', '.tsx',
'.php', '.rb', '.go', '.rs', '.cpp', '.c', '.h',
'.cs', '.swift', '.kt', '.scala', '.pl', '.r'
}
# Files/directories to exclude
EXCLUDE_PATTERNS: Set[str] = {
'__pycache__', '.git', '.svn', 'node_modules',
'venv', 'env', '.venv', 'dist', 'build',
'.idea', '.vscode', '.pytest_cache', '.mypy_cache'
}
def __init__(self, upload_dir: str = "./uploads"):
"""
Initialize file handler.
Args:
upload_dir: Directory to store uploaded and extracted files
"""
self.upload_dir = Path(upload_dir)
self.upload_dir.mkdir(exist_ok=True, parents=True)
def extract_repo(self, zip_path: str) -> str:
"""
Extract uploaded repository ZIP file.
Args:
zip_path: Path to the ZIP file
Returns:
Path to extracted directory
Raises:
ValueError: If file is not a valid ZIP
"""
if not zipfile.is_zipfile(zip_path):
raise ValueError(f"File {zip_path} is not a valid ZIP file")
# Create unique extraction directory
extract_path = self.upload_dir / "extracted"
# Clean up previous extraction
if extract_path.exists():
shutil.rmtree(extract_path)
extract_path.mkdir(exist_ok=True, parents=True)
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_path)
return str(extract_path)
except Exception as e:
raise ValueError(f"Error extracting ZIP file: {e}")
def list_code_files(self, repo_path: str) -> List[str]:
"""
List all code files in repository.
Args:
repo_path: Path to repository directory
Returns:
List of relative file paths
"""
code_files = []
repo_path = Path(repo_path)
for root, dirs, files in os.walk(repo_path):
# Filter out excluded directories
dirs[:] = [d for d in dirs if d not in self.EXCLUDE_PATTERNS]
for filename in files:
file_path = Path(root) / filename
# Check if it's a code file
if file_path.suffix in self.CODE_EXTENSIONS:
# Get relative path
rel_path = file_path.relative_to(repo_path)
code_files.append(str(rel_path))
return sorted(code_files)
def read_file(self, file_path: str, max_size: int = 1024 * 1024) -> str:
"""
Read file contents safely.
Args:
file_path: Path to file
max_size: Maximum file size in bytes (default 1MB)
Returns:
File contents as string
Raises:
ValueError: If file is too large or cannot be read
"""
file_path = Path(file_path)
if not file_path.exists():
raise ValueError(f"File {file_path} does not exist")
file_size = file_path.stat().st_size
if file_size > max_size:
raise ValueError(
f"File {file_path} is too large ({file_size} bytes). "
f"Maximum size is {max_size} bytes."
)
try:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
except UnicodeDecodeError:
# Try with different encoding
try:
with open(file_path, 'r', encoding='latin-1') as f:
return f.read()
except Exception as e:
raise ValueError(f"Cannot read file {file_path}: {e}")
def get_file_info(self, file_path: str) -> dict:
"""
Get information about a file.
Args:
file_path: Path to file
Returns:
Dictionary with file information
"""
file_path = Path(file_path)
if not file_path.exists():
return {"exists": False}
stat = file_path.stat()
return {
"exists": True,
"name": file_path.name,
"extension": file_path.suffix,
"size_bytes": stat.st_size,
"size_kb": round(stat.st_size / 1024, 2),
"is_code": file_path.suffix in self.CODE_EXTENSIONS
}
def cleanup(self):
"""Clean up temporary files and directories."""
if self.upload_dir.exists():
shutil.rmtree(self.upload_dir)
self.upload_dir.mkdir(exist_ok=True, parents=True)
|