dataset-builder / data2 /instruction_generation /summarize_repo_readme.py
DouDou
Upload data2/instruction_generation/summarize_repo_readme.py with huggingface_hub
657cb02 verified
#!/usr/bin/env python3
"""
README/Markdown Summarization Script
Scans README files in each repository, uses LLM to generate summaries, writes back to README_SUMMARY.md
"""
import os
import sys
import json
import asyncio
import argparse
from pathlib import Path
from typing import List, Dict, Optional
from tqdm import tqdm
import hashlib
from dotenv import load_dotenv
# Load .env file (before importing other modules)
env_file = Path(__file__).parent / ".env"
if env_file.exists():
load_dotenv(env_file)
elif (Path(__file__).parent.parent / ".env").exists():
# If not in current directory, try loading from project root
load_dotenv(Path(__file__).parent.parent / ".env")
# Add current directory to path (for importing schemas)
sys.path.insert(0, str(Path(__file__).parent))
# Add domain_code/src to path for reusing util functions
sys.path.insert(0, str(Path(__file__).parent.parent / "domain_code" / "src"))
from util import call_llm, init_logger, logger
from schemas import READMESummary
# Default output filename (written back to repository directory)
SUMMARY_FILENAME = "README_SUMMARY.md"
def find_readme_files(repo_dir: Path) -> List[Path]:
"""
Find README files in the repository
Args:
repo_dir: Repository root directory
Returns:
List of README file paths
"""
readme_files = []
# README* files in repository root directory
for pattern in ["README*", "readme*"]:
readme_files.extend(repo_dir.glob(pattern))
# README* files in docs/ directory
docs_dir = repo_dir / "docs"
if docs_dir.exists() and docs_dir.is_dir():
for pattern in ["README*", "readme*"]:
readme_files.extend(docs_dir.glob(pattern))
# Filter: keep only .md or .markdown files
readme_files = [
f for f in readme_files
if f.is_file() and f.suffix.lower() in [".md", ".markdown"]
]
return sorted(set(readme_files)) # Deduplicate and sort
def read_readme_content(readme_files: List[Path]) -> str:
"""
Read and merge all README file contents
Args:
readme_files: List of README file paths
Returns:
Merged README content
"""
contents = []
for readme_file in readme_files:
try:
with open(readme_file, "r", encoding="utf-8", errors="ignore") as f:
content = f.read().strip()
if content:
contents.append(f"## File: {readme_file.name}\n\n{content}")
except Exception as e:
logger.warning(f"Unable to read file {readme_file}: {e}")
return "\n\n---\n\n".join(contents)
async def summarize_readme(
readme_content: str,
base_url: str,
model: str,
api_key: str,
log_file: str,
) -> Optional[Dict]:
"""
Use LLM to summarize README content
Args:
readme_content: README file content
base_url: LLM API base URL
model: Model name
api_key: API key
log_file: Log file path
Returns:
README summary (dict) or None
"""
# Read prompt template
prompt_template_path = Path(__file__).parent / "prompts" / "readme_summary.txt"
try:
with open(prompt_template_path, "r", encoding="utf-8") as f:
prompt_template = f.read()
except Exception as e:
logger.error(f"Unable to read prompt template: {e}")
return None
# Build prompt
prompt = prompt_template.format(readme_content=readme_content)
# Call LLM
messages = [{"role": "user", "content": prompt}]
try:
result = await call_llm(
messages=messages,
model=model,
base_url=base_url,
api_key=api_key,
pydantic_object=READMESummary,
log_file=log_file,
)
if result is None:
logger.warning("LLM call returned None, skipping")
return None
# If result is a string, try to parse JSON
if isinstance(result, str):
try:
result = json.loads(result)
except json.JSONDecodeError:
logger.warning(f"Unable to parse JSON from LLM response: {result[:200]}")
return None
return result
except Exception as e:
logger.error(f"LLM call failed: {e}")
return None
def write_summary_file(
repo_dir: Path,
summary: Dict,
readme_files: List[Path],
) -> Path:
"""
Write summary to README_SUMMARY.md file (in repository directory)
Args:
repo_dir: Repository root directory
summary: README summary (dict)
readme_files: Original README file list
Returns:
Output file path
"""
output_file = repo_dir / SUMMARY_FILENAME
# Build output content (simplified format)
lines = []
lines.append("# Project Summary\n\n")
# Add summary fields (only essential ones)
if "project_overview" in summary:
lines.append(f"## Project Overview\n\n{summary['project_overview']}\n\n")
if "main_features" in summary:
lines.append(f"## Main Features\n\n{summary['main_features']}\n\n")
# Write to file
try:
with open(output_file, "w", encoding="utf-8") as f:
f.write("".join(lines))
logger.info(f"Summary file written: {output_file}")
return output_file
except Exception as e:
logger.error(f"Unable to write summary file {output_file}: {e}")
raise
async def process_single_repo(
repo_dir: Path,
base_url: str,
model: str,
api_key: str,
log_file: str,
overwrite: bool = False,
) -> Dict[str, any]:
"""
Process README summarization for a single repository
Args:
repo_dir: Repository root directory
base_url: LLM API base URL
model: Model name
api_key: API key
log_file: Log file path
overwrite: Whether to overwrite existing summary file
Returns:
Processing result dictionary
"""
repo_name = repo_dir.name
summary_file = repo_dir / SUMMARY_FILENAME
# Check if summary file already exists
if summary_file.exists() and not overwrite:
return {
"repo": repo_name,
"status": "skipped",
"reason": "Summary file already exists",
}
# Find README files
readme_files = find_readme_files(repo_dir)
if not readme_files:
return {
"repo": repo_name,
"status": "no_readme",
"reason": "README file not found",
}
# Read README content
readme_content = read_readme_content(readme_files)
if not readme_content:
return {
"repo": repo_name,
"status": "empty_readme",
"reason": "README file is empty",
}
# Call LLM to generate summary
summary = await summarize_readme(
readme_content=readme_content,
base_url=base_url,
model=model,
api_key=api_key,
log_file=log_file,
)
if summary is None:
return {
"repo": repo_name,
"status": "llm_failed",
"reason": "LLM call failed",
}
# Write summary file
try:
write_summary_file(repo_dir, summary, readme_files)
return {
"repo": repo_name,
"status": "success",
"summary_file": str(summary_file),
"readme_count": len(readme_files),
}
except Exception as e:
return {
"repo": repo_name,
"status": "write_failed",
"reason": str(e),
}
async def process_all_repos(
repos_dir: Path,
base_url: str,
model: str,
api_key: str,
log_file: str,
max_concurrency: int = 8,
overwrite: bool = False,
) -> List[Dict]:
"""
Process README summarization for all repositories
Args:
repos_dir: Repository root directory
base_url: LLM API base URL
model: Model name
api_key: API key
log_file: Log file path
max_concurrency: Maximum concurrency
overwrite: Whether to overwrite existing summary files
Returns:
List of processing results for all repositories
"""
# Get all repository directories
repo_dirs = [
d for d in repos_dir.iterdir()
if d.is_dir() and not d.name.startswith(".")
]
repo_dirs.sort()
logger.info(f"Found {len(repo_dirs)} repositories, starting processing...")
# Use semaphore to control concurrency
semaphore = asyncio.Semaphore(max_concurrency)
async def process_with_semaphore(repo_dir: Path):
async with semaphore:
return await process_single_repo(
repo_dir=repo_dir,
base_url=base_url,
model=model,
api_key=api_key,
log_file=log_file,
overwrite=overwrite,
)
# Process all repositories concurrently
tasks = [process_with_semaphore(repo_dir) for repo_dir in repo_dirs]
results = []
for task in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Processing repos"):
result = await task
results.append(result)
return results
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="README/Markdown Summarization Tool")
parser.add_argument(
"--repos_dir",
type=str,
default="/home/weifengsun/tangou1/domain_code/src/workdir/repos_filtered",
help="Repository root directory path",
)
parser.add_argument(
"--base_url",
type=str,
default=os.getenv("OPENAI_BASE_URL", "http://localhost:8000/v1"),
help="LLM API base URL (default: http://localhost:8000/v1)",
)
parser.add_argument(
"--model",
type=str,
default="Qwen3",
help="Model name (default: Qwen3)",
)
parser.add_argument(
"--api_key_env",
type=str,
default="OPENAI_API_KEY",
help="API key environment variable name (default: OPENAI_API_KEY)",
)
parser.add_argument(
"--max_concurrency",
type=int,
default=8,
help="Maximum concurrency (default: 8)",
)
parser.add_argument(
"--overwrite",
action="store_true",
help="Overwrite existing summary files",
)
parser.add_argument(
"--log_file",
type=str,
default="instruction_generation/workdir/logs/summarize.log",
help="Log file path",
)
args = parser.parse_args()
# Initialize logger
init_logger(args.log_file, level="INFO")
# Get API key
api_key = os.getenv(args.api_key_env, "none")
# Process all repositories
repos_dir = Path(args.repos_dir)
if not repos_dir.exists():
logger.error(f"Repository directory does not exist: {repos_dir}")
sys.exit(1)
# Create log directory
log_file_path = Path(args.log_file)
log_file_path.parent.mkdir(parents=True, exist_ok=True)
# Run main logic
results = asyncio.run(
process_all_repos(
repos_dir=repos_dir,
base_url=args.base_url,
model=args.model,
api_key=api_key,
log_file=str(log_file_path),
max_concurrency=args.max_concurrency,
overwrite=args.overwrite,
)
)
# Statistics
status_counts = {}
for result in results:
status = result["status"]
status_counts[status] = status_counts.get(status, 0) + 1
logger.info("\n" + "=" * 80)
logger.info("Processing complete!")
logger.info("=" * 80)
logger.info(f"Total: {len(results)} repositories")
for status, count in status_counts.items():
logger.info(f" {status}: {count}")
logger.info("=" * 80)