Spaces:
Running
Running
Asish Karthikeya Gogineni
Refactor: Upgraded to Agentic Chatbot with AST & Call Graph support
5b89d45 | """Universal ingestor that handles multiple input types: ZIP files, GitHub URLs, local directories, etc.""" | |
| import logging | |
| import os | |
| import zipfile | |
| import requests | |
| import tempfile | |
| import shutil | |
| from abc import ABC, abstractmethod | |
| from typing import Any, Dict, Generator, Tuple, Optional | |
| from urllib.parse import urlparse | |
| from pathlib import Path | |
| from langchain_core.documents import Document | |
| logger = logging.getLogger(__name__) | |
| class DataManager(ABC): | |
| """Abstract base class for data managers.""" | |
| def __init__(self, dataset_id: str): | |
| self.dataset_id = dataset_id | |
| def download(self) -> bool: | |
| """Downloads/prepares the data.""" | |
| pass | |
| def walk(self, get_content: bool = True) -> Generator[Tuple[Any, Dict], None, None]: | |
| """Yields (content, metadata) tuples for each file.""" | |
| pass | |
| class UniversalIngestor(DataManager): | |
| """Factory class to ingest data from various sources.""" | |
| def __init__(self, source: str, local_dir: Optional[str] = None, **kwargs): | |
| """ | |
| Args: | |
| source: Can be: | |
| - GitHub URL (e.g., "https://github.com/owner/repo") | |
| - GitHub repo ID (e.g., "owner/repo") | |
| - Local directory path | |
| - ZIP file path | |
| - Web URL | |
| local_dir: Directory to store/clone/download data | |
| **kwargs: Additional arguments for specific managers | |
| """ | |
| super().__init__(dataset_id=source) | |
| self.source = source | |
| self.kwargs = kwargs | |
| self.local_dir = local_dir or os.path.join(tempfile.gettempdir(), "code_chatbot") | |
| self.delegate = self._detect_handler() | |
| def _detect_handler(self) -> DataManager: | |
| """Detects the type of input and returns the appropriate handler.""" | |
| source = self.source.strip() | |
| # Check if it's a URL | |
| if self._is_url(source): | |
| if "github.com" in source or source.count("/") == 1 and "/" in source: | |
| # GitHub URL or repo ID (owner/repo) | |
| if "github.com" in source: | |
| # Extract repo_id from URL | |
| parts = urlparse(source).path.strip("/").split("/") | |
| if len(parts) >= 2: | |
| repo_id = f"{parts[0]}/{parts[1]}" | |
| else: | |
| raise ValueError(f"Invalid GitHub URL: {source}") | |
| else: | |
| # Assume it's owner/repo format | |
| repo_id = source | |
| return GitHubRepoManager( | |
| repo_id=repo_id, | |
| local_dir=self.local_dir, | |
| **self.kwargs | |
| ) | |
| # Other web URLs | |
| return WebDocManager(source, local_dir=self.local_dir) | |
| # Check if it's a ZIP file | |
| if source.lower().endswith('.zip') and os.path.isfile(source): | |
| return ZIPFileManager(source, local_dir=self.local_dir) | |
| # Check if it's a local directory | |
| if os.path.isdir(source): | |
| return LocalDirectoryManager(source) | |
| # Check if it's a local file | |
| if os.path.isfile(source): | |
| return LocalFileManager(source) | |
| raise ValueError(f"Unable to determine source type for: {source}") | |
| def _is_url(self, s: str) -> bool: | |
| """Checks if a string is a URL.""" | |
| try: | |
| result = urlparse(s) | |
| return bool(result.scheme and result.netloc) | |
| except Exception: | |
| # Check if it looks like owner/repo (GitHub format) | |
| if "/" in s and s.count("/") == 1 and not os.path.exists(s): | |
| return True | |
| return False | |
| def local_path(self) -> str: | |
| """Returns the local path where data is stored.""" | |
| if hasattr(self.delegate, "local_path"): | |
| return self.delegate.local_path | |
| if hasattr(self.delegate, "path"): | |
| return self.delegate.path | |
| return self.local_dir | |
| def download(self) -> bool: | |
| """Downloads/prepares the data.""" | |
| return self.delegate.download() | |
| def walk(self, get_content: bool = True) -> Generator[Tuple[Any, Dict], None, None]: | |
| """Yields (content, metadata) tuples.""" | |
| yield from self.delegate.walk(get_content) | |
| class ZIPFileManager(DataManager): | |
| """Handles ZIP file ingestion.""" | |
| def __init__(self, zip_path: str, local_dir: str): | |
| super().__init__(dataset_id=zip_path) | |
| self.zip_path = zip_path | |
| self.local_dir = local_dir | |
| self.path = os.path.join(local_dir, "extracted", os.path.basename(zip_path).replace('.zip', '')) | |
| def download(self) -> bool: | |
| """Extracts the ZIP file.""" | |
| if os.path.exists(self.path): | |
| logger.info(f"ZIP already extracted to {self.path}") | |
| return True | |
| os.makedirs(self.path, exist_ok=True) | |
| try: | |
| with zipfile.ZipFile(self.zip_path, 'r') as zip_ref: | |
| zip_ref.extractall(self.path) | |
| logger.info(f"Extracted {self.zip_path} to {self.path}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to extract ZIP: {e}") | |
| return False | |
| def walk(self, get_content: bool = True) -> Generator[Tuple[Any, Dict], None, None]: | |
| """Walks extracted files.""" | |
| if not os.path.exists(self.path): | |
| return | |
| IGNORE_DIRS = {'__pycache__', '.git', 'node_modules', 'venv', '.venv', '.env'} | |
| IGNORE_EXTENSIONS = { | |
| '.pyc', '.png', '.jpg', '.jpeg', '.gif', '.ico', '.svg', '.mp4', '.mov', | |
| '.zip', '.tar', '.gz', '.pdf', '.exe', '.bin', '.pkl', '.npy', '.pt', '.pth' | |
| } | |
| for root, dirs, files in os.walk(self.path): | |
| dirs[:] = [d for d in dirs if d not in IGNORE_DIRS and not d.startswith('.')] | |
| for file in files: | |
| if file.startswith('.'): | |
| continue | |
| file_path = os.path.join(root, file) | |
| _, ext = os.path.splitext(file) | |
| if ext.lower() in IGNORE_EXTENSIONS: | |
| continue | |
| rel_path = os.path.relpath(file_path, self.path) | |
| if get_content: | |
| try: | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read() | |
| yield content, { | |
| "file_path": file_path, | |
| "source": rel_path, | |
| "file_name": file | |
| } | |
| except Exception as e: | |
| logger.warning(f"Failed to read {file_path}: {e}") | |
| else: | |
| yield {"file_path": file_path, "source": rel_path, "file_name": file} | |
| class LocalDirectoryManager(DataManager): | |
| """Handles local directory ingestion.""" | |
| def __init__(self, path: str): | |
| super().__init__(dataset_id=path) | |
| self.path = path | |
| self.local_dir = path | |
| def download(self) -> bool: | |
| return os.path.isdir(self.path) | |
| def walk(self, get_content: bool = True) -> Generator[Tuple[Any, Dict], None, None]: | |
| """Walks local directory.""" | |
| IGNORE_DIRS = {'__pycache__', '.git', 'node_modules', 'venv', '.venv', '.env'} | |
| IGNORE_EXTENSIONS = { | |
| '.pyc', '.png', '.jpg', '.jpeg', '.gif', '.ico', '.svg', '.mp4', '.mov', | |
| '.zip', '.tar', '.gz', '.pdf', '.exe', '.bin', '.pkl', '.npy', '.pt', '.pth' | |
| } | |
| for root, dirs, files in os.walk(self.path): | |
| dirs[:] = [d for d in dirs if d not in IGNORE_DIRS and not d.startswith('.')] | |
| for file in files: | |
| if file.startswith('.'): | |
| continue | |
| file_path = os.path.join(root, file) | |
| _, ext = os.path.splitext(file) | |
| if ext.lower() in IGNORE_EXTENSIONS: | |
| continue | |
| rel_path = os.path.relpath(file_path, self.path) | |
| if get_content: | |
| try: | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read() | |
| yield content, { | |
| "file_path": file_path, | |
| "source": rel_path, | |
| "url": f"file://{file_path}" | |
| } | |
| except Exception as e: | |
| logger.warning(f"Skipping {file_path}: {e}") | |
| else: | |
| yield {"file_path": file_path, "source": rel_path} | |
| class LocalFileManager(DataManager): | |
| """Handles single file ingestion.""" | |
| def __init__(self, path: str): | |
| super().__init__(dataset_id=path) | |
| self.path = path | |
| def download(self) -> bool: | |
| return os.path.exists(self.path) | |
| def walk(self, get_content: bool = True) -> Generator[Tuple[Any, Dict], None, None]: | |
| """Yields the single file.""" | |
| if get_content: | |
| try: | |
| with open(self.path, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read() | |
| yield content, {"file_path": self.path, "source": os.path.basename(self.path)} | |
| except Exception as e: | |
| logger.error(f"Failed to read {self.path}: {e}") | |
| else: | |
| yield {"file_path": self.path, "source": os.path.basename(self.path)} | |
| class GitHubRepoManager(DataManager): | |
| """Handles GitHub repository cloning and ingestion.""" | |
| def __init__(self, repo_id: str, local_dir: str, access_token: Optional[str] = None, commit_hash: Optional[str] = None): | |
| """ | |
| Args: | |
| repo_id: GitHub repo in format "owner/repo" | |
| local_dir: Directory to clone to | |
| access_token: GitHub token for private repos | |
| commit_hash: Optional commit hash to checkout | |
| """ | |
| super().__init__(dataset_id=repo_id) | |
| self.repo_id = repo_id | |
| self.local_dir = local_dir | |
| self.access_token = access_token or os.getenv("GITHUB_TOKEN") | |
| self.commit_hash = commit_hash | |
| self.path = os.path.join(local_dir, repo_id.replace("/", "_")) | |
| def download(self) -> bool: | |
| """Clones the GitHub repository.""" | |
| if os.path.exists(self.path) and os.listdir(self.path): | |
| logger.info(f"Repo already cloned at {self.path}") | |
| return True | |
| try: | |
| from git import Repo, GitCommandError | |
| if self.access_token: | |
| clone_url = f"https://{self.access_token}@github.com/{self.repo_id}.git" | |
| else: | |
| clone_url = f"https://github.com/{self.repo_id}.git" | |
| os.makedirs(self.local_dir, exist_ok=True) | |
| if self.commit_hash: | |
| repo = Repo.clone_from(clone_url, self.path) | |
| repo.git.checkout(self.commit_hash) | |
| else: | |
| Repo.clone_from(clone_url, self.path, depth=1, single_branch=True) | |
| logger.info(f"Cloned {self.repo_id} to {self.path}") | |
| return True | |
| except ImportError: | |
| logger.error("GitPython not installed. Install with: pip install gitpython") | |
| raise | |
| except Exception as e: | |
| logger.error(f"Failed to clone {self.repo_id}: {e}") | |
| return False | |
| def walk(self, get_content: bool = True) -> Generator[Tuple[Any, Dict], None, None]: | |
| """Walks cloned repository.""" | |
| if not os.path.exists(self.path): | |
| return | |
| # Use LocalDirectoryManager logic | |
| manager = LocalDirectoryManager(self.path) | |
| yield from manager.walk(get_content) | |
| class WebDocManager(DataManager): | |
| """Handles web page/document ingestion.""" | |
| def __init__(self, url: str, local_dir: str): | |
| super().__init__(dataset_id=url) | |
| self.url = url | |
| self.local_dir = local_dir | |
| def download(self) -> bool: | |
| """Checks if URL is accessible.""" | |
| try: | |
| response = requests.get(self.url, timeout=10) | |
| return response.status_code == 200 | |
| except Exception as e: | |
| logger.error(f"Could not reach {self.url}: {e}") | |
| return False | |
| def walk(self, get_content: bool = True) -> Generator[Tuple[Any, Dict], None, None]: | |
| """Fetches web page content.""" | |
| try: | |
| response = requests.get(self.url, timeout=10) | |
| if get_content: | |
| from bs4 import BeautifulSoup | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| text = soup.get_text(separator='\n') | |
| yield text, {"file_path": self.url, "url": self.url, "source": "web"} | |
| else: | |
| yield {"file_path": self.url, "url": self.url, "source": "web"} | |
| except Exception as e: | |
| logger.error(f"Failed to fetch {self.url}: {e}") | |
| def process_source(source: str, extract_to: str) -> Tuple[list, str]: | |
| """ | |
| Convenience function to process any source type and return documents + local path. | |
| Returns: | |
| Tuple of (documents, local_path) | |
| """ | |
| ingestor = UniversalIngestor(source, local_dir=extract_to) | |
| if not ingestor.download(): | |
| raise ValueError(f"Failed to download/prepare source: {source}") | |
| documents = [] | |
| for content, metadata in ingestor.walk(get_content=True): | |
| documents.append(Document( | |
| page_content=content, | |
| metadata=metadata | |
| )) | |
| return documents, ingestor.local_path | |