| """ |
| PIOE GitHub Client |
| |
| Tracks trending repositories and star velocity for AI/Robotics/CV projects. |
| """ |
| import httpx |
| from datetime import datetime, timedelta |
| from typing import Optional |
|
|
|
|
| class GitHubClient: |
| """ |
| Client for GitHub API to discover trending repositories. |
| Tracks star velocity and contributor growth. |
| """ |
| |
| BASE_URL = "https://api.github.com" |
| |
| |
| SEARCH_TOPICS = [ |
| "computer-vision", |
| "robotics", |
| "machine-learning", |
| "deep-learning", |
| "ros", |
| "pytorch", |
| "transformers", |
| "llm" |
| ] |
| |
| def __init__(self, token: Optional[str] = None, max_results: int = 30): |
| self.token = token |
| self.max_results = max_results |
| self._headers = { |
| "Accept": "application/vnd.github+json", |
| "X-GitHub-Api-Version": "2022-11-28" |
| } |
| if token: |
| self._headers["Authorization"] = f"Bearer {token}" |
| |
| async def fetch_trending(self, topics: Optional[list[str]] = None) -> list[dict]: |
| """ |
| Fetch recently popular repositories in target topics. |
| |
| Returns list of normalized opportunity dicts. |
| """ |
| topics = topics or self.SEARCH_TOPICS |
| opportunities = [] |
| |
| |
| week_ago = (datetime.utcnow() - timedelta(days=7)).strftime("%Y-%m-%d") |
| |
| for topic in topics[:5]: |
| try: |
| repos = await self._search_repos(topic, week_ago) |
| opportunities.extend(repos) |
| except Exception as e: |
| print(f"GitHub search error for {topic}: {e}") |
| |
| |
| seen_urls = set() |
| unique = [] |
| for opp in opportunities: |
| if opp["url"] not in seen_urls: |
| seen_urls.add(opp["url"]) |
| unique.append(opp) |
| |
| return unique[:self.max_results] |
| |
| async def _search_repos(self, topic: str, since_date: str) -> list[dict]: |
| """Search for repositories by topic.""" |
| query = f"topic:{topic} pushed:>{since_date} stars:>50" |
| |
| async with httpx.AsyncClient() as client: |
| response = await client.get( |
| f"{self.BASE_URL}/search/repositories", |
| params={ |
| "q": query, |
| "sort": "stars", |
| "order": "desc", |
| "per_page": 10 |
| }, |
| headers=self._headers, |
| timeout=30, |
| follow_redirects=True |
| ) |
| response.raise_for_status() |
| |
| data = response.json() |
| return self._parse_repos(data.get("items", []), topic) |
| |
| def _parse_repos(self, repos: list, topic: str) -> list[dict]: |
| """Parse GitHub repos into normalized opportunities.""" |
| opportunities = [] |
| |
| for repo in repos: |
| try: |
| opportunity = { |
| "title": f"[GitHub] {repo['full_name']}: {repo.get('description', '')[:100]}", |
| "raw_text": repo.get("description", "") or "", |
| "url": repo["html_url"], |
| "source_type": "github", |
| "source_name": f"GitHub/{topic}", |
| "published_at": self._parse_date(repo.get("created_at")), |
| "social_engagement": repo.get("stargazers_count", 0), |
| "metadata": { |
| "owner": repo["owner"]["login"], |
| "stars": repo.get("stargazers_count", 0), |
| "forks": repo.get("forks_count", 0), |
| "language": repo.get("language"), |
| "topics": repo.get("topics", []), |
| "open_issues": repo.get("open_issues_count", 0), |
| "updated_at": repo.get("updated_at") |
| } |
| } |
| opportunities.append(opportunity) |
| except Exception as e: |
| print(f"Error parsing repo: {e}") |
| |
| return opportunities |
| |
| async def fetch_gsoc_repos(self) -> list[dict]: |
| """Fetch Google Summer of Code related repositories.""" |
| async with httpx.AsyncClient() as client: |
| response = await client.get( |
| f"{self.BASE_URL}/search/repositories", |
| params={ |
| "q": "topic:gsoc OR topic:google-summer-of-code", |
| "sort": "updated", |
| "per_page": 20 |
| }, |
| headers=self._headers, |
| timeout=30, |
| follow_redirects=True |
| ) |
| response.raise_for_status() |
| |
| data = response.json() |
| repos = self._parse_repos(data.get("items", []), "gsoc") |
| |
| |
| for repo in repos: |
| repo["title"] = f"[GSoC] {repo['title'].replace('[GitHub] ', '')}" |
| |
| return repos |
| |
| def _parse_date(self, date_str: Optional[str]) -> Optional[datetime]: |
| """Parse GitHub date format.""" |
| if not date_str: |
| return None |
| try: |
| return datetime.fromisoformat(date_str.replace("Z", "+00:00")) |
| except Exception: |
| return None |
|
|