File size: 9,948 Bytes
6061012
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
"""
Data processing utilities for the Coding Expert model
"""
import json
import os
from pathlib import Path
import jsonlines
from typing import Dict, List, Any, Optional, Tuple
import hashlib
import datetime
import logging
import numpy as np
import pandas as pd
from datasets import Dataset
from tqdm import tqdm
import ast
import re
from collections import Counter

class CodeDataProcessor:
    def __init__(self, output_dir: str = "processed_data"):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        self.logger = self._setup_logger()
        
    def _setup_logger(self) -> logging.Logger:
        """Setup logging specific to code processing"""
        logger = logging.getLogger(__name__)
        logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        return logger

    def process_code(self, code: str, language: str = "python") -> Dict[str, Any]:
        """Process and analyze code snippet"""
        try:
            # Basic cleaning
            code = self._clean_code(code)
            
            # Parse AST if possible
            ast_info = self._parse_ast(code, language)
            
            # Extract code metrics
            metrics = self._extract_code_metrics(code, ast_info)
            
            # Identify patterns and anti-patterns
            patterns = self._identify_patterns(code)
            
            return {
                "code": code,
                "language": language,
                "ast_info": ast_info,
                "metrics": metrics,
                "patterns": patterns
            }
        except Exception as e:
            self.logger.warning(f"Error processing code: {str(e)}")
            return {"error": str(e)}

    def _clean_code(self, code: str) -> str:
        """Clean code by removing unnecessary whitespace and comments"""
        # Remove trailing whitespace
        code = code.strip()
        
        # Remove empty lines
        lines = [line.strip() for line in code.split('\n') if line.strip()]
        code = '\n'.join(lines)
        
        return code

    def _parse_ast(self, code: str, language: str) -> Dict[str, Any]:
        """Parse code into AST and extract structure"""
        try:
            if language == "python":
                tree = ast.parse(code)
                return {
                    "num_functions": len([node for node in ast.walk(tree) if isinstance(node, ast.FunctionDef)]),
                    "num_classes": len([node for node in ast.walk(tree) if isinstance(node, ast.ClassDef)]),
                    "complexity": self._calculate_complexity(tree)
                }
            return {}
        except Exception as e:
            return {"error": str(e)}

    def _calculate_complexity(self, tree: ast.AST) -> int:
        """Calculate cyclomatic complexity"""
        complexity = 1  # Start with 1 for the main program
        for node in ast.walk(tree):
            if isinstance(node, (ast.If, ast.For, ast.While, ast.Try, ast.ExceptHandler)):
                complexity += 1
        return complexity

    def _extract_code_metrics(self, code: str, ast_info: Dict[str, Any]) -> Dict[str, Any]:
        """Extract various code metrics"""
        metrics = {
            "length": len(code),
            "lines": len(code.split('\n')),
            "tokens": len(code.split()),
            "unique_tokens": len(set(code.split())),
            "ast_complexity": ast_info.get("complexity", 0),
            "function_count": ast_info.get("num_functions", 0),
            "class_count": ast_info.get("num_classes", 0)
        }
        
        # Calculate token distribution
        tokens = code.split()
        token_dist = Counter(tokens)
        metrics["token_distribution"] = token_dist.most_common(5)
        
        return metrics

    def _identify_patterns(self, code: str) -> Dict[str, List[str]]:
        """Identify common code patterns and anti-patterns"""
        patterns = {
            "design_patterns": [],
            "anti_patterns": [],
            "security_issues": []
        }
        
        # Look for common design patterns
        if "class" in code and "def" in code:
            patterns["design_patterns"].append("Class-based design")
        
        # Look for anti-patterns
        if "global" in code:
            patterns["anti_patterns"].append("Global variables")
        
        # Look for security issues
        if "eval(" in code:
            patterns["security_issues"].append("Eval usage")
        
        return patterns

    def process_dataset(self, dataset: Dataset, dataset_name: str) -> List[Dict[str, Any]]:
        """Process a complete dataset"""
        processed = []
        error_count = 0
        
        self.logger.info(f"Processing {dataset_name} dataset with {len(dataset)} samples")
        
        for idx, example in enumerate(tqdm(dataset, desc=f"Processing {dataset_name}")):
            try:
                processed_example = self._process_example(example, dataset_name)
                processed.append(processed_example)
            except Exception as e:
                error_count += 1
                self.logger.error(f"Error processing example {idx} in {dataset_name}: {str(e)}")
        
        self.logger.info(f"Processed {len(processed)} examples")
        self.logger.info(f"Encountered {error_count} errors")
        
        return processed

    def _process_example(self, example: Dict[str, Any], dataset_name: str) -> Dict[str, Any]:
        """Process a single example based on dataset type"""
        if dataset_name == "CodeSearchNet":
            return self._process_code_search_net(example)
        elif dataset_name == "HumanEval":
            return self._process_human_eval(example)
        elif dataset_name == "MBPP":
            return self._process_mbpp(example)
        elif dataset_name == "Spider":
            return self._process_spider(example)
        elif dataset_name == "DeepFix":
            return self._process_deep_fix(example)
        elif dataset_name == "CodeXGLUE":
            return self._process_codexglue(example)
        else:
            raise ValueError(f"Unknown dataset: {dataset_name}")

    def _process_code_search_net(self, example: Dict[str, Any]) -> Dict[str, Any]:
        """Process CodeSearchNet example"""
        return {
            "code": example["code"].strip(),
            "docstring": example["docstring"].strip(),
            "language": example["language"],
            "function_name": example["function_name"],
            "code_analysis": self.process_code(example["code"])  # Reuse code processing
        }

    def _process_human_eval(self, example: Dict[str, Any]) -> Dict[str, Any]:
        """Process HumanEval example"""
        return {
            "task_id": example["task_id"],
            "prompt": example["prompt"].strip(),
            "solution": example["canonical_solution"].strip(),
            "test": example["test"].strip(),
            "entry_point": example["entry_point"],
            "code_analysis": self.process_code(example["canonical_solution"])  # Reuse code processing
        }

    def _process_mbpp(self, example: Dict[str, Any]) -> Dict[str, Any]:
        """Process MBPP example"""
        return {
            "task_id": example["task_id"],
            "problem": example["text"].strip(),
            "solution": example["code"].strip(),
            "test_list": example["test_list"],
            "challenge_test_list": example["challenge_test_list"],
            "code_analysis": self.process_code(example["code"])  # Reuse code processing
        }

    def _process_spider(self, example: Dict[str, Any]) -> Dict[str, Any]:
        """Process Spider example"""
        return {
            "query": example["query"].strip(),
            "question": example["question"].strip(),
            "db_id": example["db_id"],
            "sql": example["sql"].strip(),
            "code_analysis": self.process_code(example["sql"])  # Reuse code processing
        }

    def _process_deep_fix(self, example: Dict[str, Any]) -> Dict[str, Any]:
        """Process DeepFix example"""
        return {
            "original_code": example["code"].strip(),
            "fixed_code": example["fixed_code"].strip(),
            "error_type": example["error_type"],
            "code_analysis": self.process_code(example["fixed_code"])  # Reuse code processing
        }

    def _process_codexglue(self, example: Dict[str, Any]) -> Dict[str, Any]:
        """Process CodeXGLUE example"""
        return {
            "code": example["code"].strip(),
            "docstring": example["docstring"].strip(),
            "task": example["task"],
            "language": example["language"],
            "code_analysis": self.process_code(example["code"])  # Reuse code processing
        }

    def save_to_jsonl(self, data: List[Dict[str, Any]], filename: str) -> Path:
        """Save processed data to JSONL file"""
        filepath = self.output_dir / filename
        with jsonlines.open(filepath, mode='w') as writer:
            writer.write_all(data)
        self.logger.info(f"Saved data to {filepath}")
        return filepath

    def print_sample(self, data: List[Dict[str, Any]], count: int = 3):
        """Print sample of processed data"""
        self.logger.info("\nSample data:")
        for i, example in enumerate(data[:count]):
            self.logger.info(f"\nSample {i+1}:")
            self.logger.info(json.dumps(example, indent=2))

    def print_memory_usage(self):
        """Print current memory usage"""
        process = psutil.Process()
        memory_info = process.memory_info()
        self.logger.info(f"Current memory usage: {memory_info.rss / 1024 / 1024:.2f} MB")