Making-Repos-Speakable / github_ingest.py
Abdullah9862873's picture
Upload github_ingest.py with huggingface_hub
5cc1e2a verified
import requests
import re
import io
import zipfile
import logging
from typing import List, Dict, Any, Optional
from pathlib import Path
from config import settings
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GitHubIngestor:
def __init__(self, repo: str = None):
self._repo = ""
self.repo = repo or settings.github_repo
self.github_token = settings.github_token
self.base_url = "https://api.github.com"
self.headers = {
"Accept": "application/vnd.github.v3+json"
}
if self.github_token:
self.headers["Authorization"] = f"token {self.github_token}"
@property
def repo(self) -> str:
return self._repo
@repo.setter
def repo(self, value: str):
if not value:
self._repo = ""
return
cleaned = value.strip().rstrip("/")
if "github.com/" in cleaned:
cleaned = cleaned.split("github.com/")[-1]
if cleaned.endswith(".git"):
cleaned = cleaned[:-4]
parts = cleaned.split("/")
if len(parts) >= 2:
self._repo = f"{parts[-2]}/{parts[-1]}"
else:
self._repo = cleaned
logger.info(f"GitHubIngestor: Initialized with repo '{self._repo}' (original: '{value}')")
def get_repo_info(self) -> Dict[str, Any]:
if not self.repo:
raise ValueError("No repository specified")
url = f"{self.base_url}/repos/{self.repo}"
response = requests.get(url, headers=self.headers, timeout=10)
response.raise_for_status()
return response.json()
def chunk_content(self, content: str, chunk_size: int = None, chunk_overlap: int = None) -> List[Dict[str, Any]]:
chunk_size = chunk_size or getattr(settings, "chunk_size", 500)
chunk_overlap = chunk_overlap or getattr(settings, "chunk_overlap", 50)
chunks = []
paragraphs = re.split(r"\n\n+", content)
current_chunk = ""
for para in paragraphs:
para = para.strip()
if not para: continue
if len(current_chunk) + len(para) + 2 > chunk_size:
if current_chunk:
chunks.append({"text": current_chunk.strip()})
current_chunk = para
else:
current_chunk = f"{current_chunk}\n\n{para}" if current_chunk else para
if current_chunk:
chunks.append({"text": current_chunk.strip()})
for i, chunk in enumerate(chunks):
chunk["index"] = i
return chunks
def extract_metadata(self, file_path: str, content: str) -> Dict[str, Any]:
return {
"source": file_path,
"file_type": Path(file_path).suffix,
"language": self._detect_language(file_path)
}
def _detect_language(self, file_path: str) -> str:
ext_map = {".py": "python", ".js": "javascript", ".ts": "typescript", ".java": "java", ".md": "markdown"}
return ext_map.get(Path(file_path).suffix, "text")
def fetch_and_chunk_repo(self, extensions: Optional[List[str]] = None) -> List[Dict[str, Any]]:
if not self.repo:
raise ValueError("No repository specified for ingestion")
branch = "main"
try:
info = self.get_repo_info()
branch = info.get("default_branch", "main")
except Exception as e:
logger.warning(f"Branch detection failed for {self.repo}, using fallback 'main': {e}")
zip_url = f"https://github.com/{self.repo}/archive/refs/heads/{branch}.zip"
logger.info(f"Downloading ZIP: {zip_url}")
resp = requests.get(zip_url, timeout=60)
if resp.status_code != 200 and branch == "main":
logger.info("Main branch ZIP failed, trying master...")
zip_url = f"https://github.com/{self.repo}/archive/refs/heads/master.zip"
resp = requests.get(zip_url, timeout=60)
if resp.status_code != 200:
raise Exception(f"Failed to download repository {self.repo} (HTTP {resp.status_code}). URL: {zip_url}")
all_chunks = []
with zipfile.ZipFile(io.BytesIO(resp.content)) as z:
for info in z.infolist():
if info.is_dir() or info.file_size > 1024 * 1024:
continue
if extensions and not any(info.filename.endswith(ext) for ext in extensions):
continue
try:
with z.open(info) as f:
text = f.read().decode('utf-8', errors='ignore')
if not text.strip(): continue
metadata = self.extract_metadata(info.filename, text)
chunks = self.chunk_content(text)
for chunk in chunks:
chunk.update(metadata)
all_chunks.extend(chunks)
except Exception as e:
logger.warning(f"Error processing {info.filename}: {e}")
return all_chunks
github_ingestor = GitHubIngestor()
def ingest_github_repo(repo: str = None, extensions: List[str] = None):
ingestor = GitHubIngestor(repo=repo)
return ingestor.fetch_and_chunk_repo(extensions=extensions)