import requests import re import io import zipfile import logging from typing import List, Dict, Any, Optional from pathlib import Path from config import settings logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class GitHubIngestor: def __init__(self, repo: str = None): self._repo = "" self.repo = repo or settings.github_repo self.github_token = settings.github_token self.base_url = "https://api.github.com" self.headers = { "Accept": "application/vnd.github.v3+json" } if self.github_token: self.headers["Authorization"] = f"token {self.github_token}" @property def repo(self) -> str: return self._repo @repo.setter def repo(self, value: str): if not value: self._repo = "" return cleaned = value.strip().rstrip("/") if "github.com/" in cleaned: cleaned = cleaned.split("github.com/")[-1] if cleaned.endswith(".git"): cleaned = cleaned[:-4] parts = cleaned.split("/") if len(parts) >= 2: self._repo = f"{parts[-2]}/{parts[-1]}" else: self._repo = cleaned logger.info(f"GitHubIngestor: Initialized with repo '{self._repo}' (original: '{value}')") def get_repo_info(self) -> Dict[str, Any]: if not self.repo: raise ValueError("No repository specified") url = f"{self.base_url}/repos/{self.repo}" response = requests.get(url, headers=self.headers, timeout=10) response.raise_for_status() return response.json() def chunk_content(self, content: str, chunk_size: int = None, chunk_overlap: int = None) -> List[Dict[str, Any]]: chunk_size = chunk_size or getattr(settings, "chunk_size", 500) chunk_overlap = chunk_overlap or getattr(settings, "chunk_overlap", 50) chunks = [] paragraphs = re.split(r"\n\n+", content) current_chunk = "" for para in paragraphs: para = para.strip() if not para: continue if len(current_chunk) + len(para) + 2 > chunk_size: if current_chunk: chunks.append({"text": current_chunk.strip()}) current_chunk = para else: current_chunk = f"{current_chunk}\n\n{para}" if current_chunk else para if current_chunk: chunks.append({"text": current_chunk.strip()}) for i, chunk in enumerate(chunks): chunk["index"] = i return chunks def extract_metadata(self, file_path: str, content: str) -> Dict[str, Any]: return { "source": file_path, "file_type": Path(file_path).suffix, "language": self._detect_language(file_path) } def _detect_language(self, file_path: str) -> str: ext_map = {".py": "python", ".js": "javascript", ".ts": "typescript", ".java": "java", ".md": "markdown"} return ext_map.get(Path(file_path).suffix, "text") def fetch_and_chunk_repo(self, extensions: Optional[List[str]] = None) -> List[Dict[str, Any]]: if not self.repo: raise ValueError("No repository specified for ingestion") branch = "main" try: info = self.get_repo_info() branch = info.get("default_branch", "main") except Exception as e: logger.warning(f"Branch detection failed for {self.repo}, using fallback 'main': {e}") zip_url = f"https://github.com/{self.repo}/archive/refs/heads/{branch}.zip" logger.info(f"Downloading ZIP: {zip_url}") resp = requests.get(zip_url, timeout=60) if resp.status_code != 200 and branch == "main": logger.info("Main branch ZIP failed, trying master...") zip_url = f"https://github.com/{self.repo}/archive/refs/heads/master.zip" resp = requests.get(zip_url, timeout=60) if resp.status_code != 200: raise Exception(f"Failed to download repository {self.repo} (HTTP {resp.status_code}). URL: {zip_url}") all_chunks = [] with zipfile.ZipFile(io.BytesIO(resp.content)) as z: for info in z.infolist(): if info.is_dir() or info.file_size > 1024 * 1024: continue if extensions and not any(info.filename.endswith(ext) for ext in extensions): continue try: with z.open(info) as f: text = f.read().decode('utf-8', errors='ignore') if not text.strip(): continue metadata = self.extract_metadata(info.filename, text) chunks = self.chunk_content(text) for chunk in chunks: chunk.update(metadata) all_chunks.extend(chunks) except Exception as e: logger.warning(f"Error processing {info.filename}: {e}") return all_chunks github_ingestor = GitHubIngestor() def ingest_github_repo(repo: str = None, extensions: List[str] = None): ingestor = GitHubIngestor(repo=repo) return ingestor.fetch_and_chunk_repo(extensions=extensions)