Stack-2-9-finetuned / scripts /extract_patterns_from_git.py
walidsobhie-code
docs: Add official launch plan
d083607
raw
history blame
9.97 kB
#!/usr/bin/env python3
"""
Extract Code Patterns from Git History
Scans Git commit history to identify bug fixes and feature additions,
extracting "before → after" patterns for training data generation.
Usage:
python extract_patterns_from_git.py --repo-path . --output patterns.jsonl
python extract_patterns_from_git.py --repo-path . --output patterns.jsonl --since-date "2024-01-01"
"""
import argparse
import hashlib
import json
import os
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
try:
from tqdm import tqdm
except ImportError:
tqdm = None
# Keywords that indicate bug fixes or improvements
BUG_FIX_KEYWORDS = [
"fix", "bug", "hotfix", "patch", "resolve", "correct", "repair",
"error", "crash", "fail", "issue", "problem", "broken"
]
FEATURE_KEYWORDS = [
"feat", "feature", "add", "new", "implement", "enhance", "improve",
"optimize", "refactor", "support", "introduce"
]
def is_text_file(filepath: str) -> bool:
"""Check if a file is likely a text file (not binary)."""
binary_extensions = {
'.pyc', '.so', '.dll', '.exe', '.bin', '.dat', '.pickle',
'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.ico', '.svg',
'.mp3', '.mp4', '.wav', '.avi', '.mov', '.pdf', '.zip',
'.tar', '.gz', '.rar', '.7z', '.whl', '.egg',
'.class', '.jar', '.war', '.ear',
'.db', '.sqlite', '.sqlite3',
'.ttf', '.otf', '.woff', '.woff2',
'.pem', '.key', '.crt', '.cer',
'.DS_Store', '.gitignore'
}
ext = Path(filepath).suffix.lower()
if ext in binary_extensions:
return False
# Try to read as text
try:
with open(filepath, 'rb') as f:
chunk = f.read(1024)
# Check for null bytes (common in binary files)
if b'\x00' in chunk:
return False
return True
except (OSError, IOError):
return False
def get_commit_messages(repo_path: str, since_date: Optional[str] = None) -> list[dict]:
"""Get commit information from git log."""
cmd = ["git", "-C", repo_path, "log", "--pretty=format:%H|%s|%an|%ad|%ae", "--date=iso"]
if since_date:
cmd.extend([f"--since={since_date}"])
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
commits = []
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split('|')
if len(parts) >= 5:
commits.append({
'hash': parts[0],
'message': parts[1],
'author': parts[2],
'date': parts[3],
'email': parts[4] if len(parts) > 4 else ''
})
return commits
except subprocess.CalledProcessError as e:
print(f"Error reading git log: {e}", file=sys.stderr)
return []
def get_changed_files(repo_path: str, commit_hash: str) -> list[str]:
"""Get list of files changed in a commit."""
cmd = ["git", "-C", repo_path, "diff-tree", "--no-commit-id", "--name-only", "-r", commit_hash]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
files = []
for line in result.stdout.strip().split('\n'):
if line.strip():
files.append(line.strip())
return files
except subprocess.CalledProcessError:
return []
def get_file_diff(repo_path: str, commit_hash: str, filepath: str) -> tuple[Optional[str], Optional[str]]:
"""Get before and after content of a file in a commit."""
# Get the file content AFTER the commit
cmd_after = ["git", "-C", repo_path, "show", f"{commit_hash}:{filepath}"]
# Get the file content BEFORE the commit (parent)
cmd_before = ["git", "-C", repo_path, "show", f"{commit_hash}^:{filepath}"]
after_content = None
before_content = None
try:
result_after = subprocess.run(cmd_after, capture_output=True, text=True, check=True)
after_content = result_after.stdout
except subprocess.CalledProcessError:
# File might be new (no parent)
after_content = None
try:
result_before = subprocess.run(cmd_before, capture_output=True, text=True, check=True)
before_content = result_before.stdout
except subprocess.CalledProcessError:
# File was added in this commit
before_content = None
return before_content, after_content
def infer_problem_type(message: str) -> str:
"""Infer the problem type from commit message."""
msg_lower = message.lower()
# Check for bug fix indicators
for keyword in BUG_FIX_KEYWORDS:
if keyword in msg_lower:
return "bug_fix"
# Check for feature indicators
for keyword in FEATURE_KEYWORDS:
if keyword in msg_lower:
return "feature_addition"
return "unknown"
def compute_confidence(message: str, before: Optional[str], after: Optional[str]) -> float:
"""Compute confidence score for the extracted pattern."""
confidence = 0.5 # Base confidence
# Higher confidence if message contains clear keywords
msg_lower = message.lower()
if any(k in msg_lower for k in ["fix", "bug", "hotfix", "patch"]):
confidence += 0.2
if any(k in msg_lower for k in ["feat", "feature", "add", "implement"]):
confidence += 0.15
# Higher confidence if we have both before and after
if before and after:
confidence += 0.15
elif before or after:
confidence += 0.05
# Higher confidence for substantial changes
if before and after:
content_len = max(len(before), len(after))
if content_len > 100:
confidence += 0.1
if content_len > 500:
confidence += 0.1
return min(confidence, 1.0)
def generate_pattern_id(commit_hash: str, filepath: str) -> str:
"""Generate a unique pattern ID."""
content = f"{commit_hash}:{filepath}"
return hashlib.sha256(content.encode()).hexdigest()[:16]
def extract_patterns(
repo_path: str,
output_path: str,
since_date: Optional[str] = None
) -> int:
"""Extract patterns from git history and write to JSONL file."""
print(f"Scanning repository: {repo_path}")
# Get all commits
commits = get_commit_messages(repo_path, since_date)
print(f"Found {len(commits)} commits")
if not commits:
print("No commits found.", file=sys.stderr)
return 0
patterns_extracted = 0
# Process each commit with progress bar
iterator = tqdm(commits, desc="Extracting patterns") if tqdm else commits
with open(output_path, 'w', encoding='utf-8') as outf:
for commit in iterator:
commit_hash = commit['hash']
message = commit['message']
author = commit['author']
date = commit['date']
# Infer problem type
problem_type = infer_problem_type(message)
# Skip if not a bug fix or feature
if problem_type == "unknown":
continue
# Get changed files
changed_files = get_changed_files(repo_path, commit_hash)
for filepath in changed_files:
# Skip binary files
full_path = os.path.join(repo_path, filepath)
if not os.path.exists(full_path):
continue
if not is_text_file(filepath):
continue
# Get diff
before_content, after_content = get_file_diff(repo_path, commit_hash, filepath)
# Skip if no meaningful change
if before_content == after_content:
continue
if not before_content and not after_content:
continue
# Compute confidence
confidence = compute_confidence(message, before_content, after_content)
# Create pattern record
pattern = {
"pattern_id": generate_pattern_id(commit_hash, filepath),
"problem_type": problem_type,
"before_code": before_content or "",
"after_code": after_content or "",
"commit_msg": message,
"author": author,
"date": date,
"confidence": round(confidence, 2)
}
# Write as JSONL
outf.write(json.dumps(pattern, ensure_ascii=False) + '\n')
patterns_extracted += 1
print(f"\nExtracted {patterns_extracted} patterns to {output_path}")
return patterns_extracted
def main():
parser = argparse.ArgumentParser(
description="Extract code patterns from Git history for training data"
)
parser.add_argument(
"--repo-path",
type=str,
required=True,
help="Path to the Git repository"
)
parser.add_argument(
"--output",
type=str,
required=True,
help="Output JSONL file path"
)
parser.add_argument(
"--since-date",
type=str,
default=None,
help="Only extract commits since this date (YYYY-MM-DD)"
)
args = parser.parse_args()
# Validate repo path
if not os.path.isdir(os.path.join(args.repo_path, '.git')):
print(f"Error: {args.repo_path} is not a Git repository", file=sys.stderr)
sys.exit(1)
# Run extraction
extract_patterns(args.repo_path, args.output, args.since_date)
if __name__ == "__main__":
main()