DIVYANSHI SINGH
🚀 Initial Commit: GitPulse
fcfc3c8
from typing import Optional
import asyncio
from fastapi import APIRouter, HTTPException, Header
from fastapi.responses import ORJSONResponse
from core.models import ProjectSearchResult, RigorResult
from core.cache import cache_get, cache_set
from core.github import get_github_data, GITHUB_API
from core.utils import validate_username, validate_param
router = APIRouter(default_response_class=ORJSONResponse)
@router.get("/api/search_projects", response_model=ProjectSearchResult)
async def search_projects(topic: str, language: Optional[str] = None, page: int = 1, x_github_token: str = Header(None)):
topic = validate_param(topic, "topic")
query = topic
if language and language.strip():
query += f' language:"{validate_param(language, "language")}"'
cache_key = f"projects:{query}:{page}"
cached = cache_get(cache_key)
if cached:
return cached
params = {"q": query, "per_page": 12, "page": page, "sort": "stars", "order": "desc"}
res = await get_github_data(f"{GITHUB_API}/search/repositories", params=params, user_token=x_github_token)
if res.status_code != 200:
raise HTTPException(status_code=502, detail="GitHub API error")
data = res.json()
projects = [
{
"repo_name": item.get("full_name"),
"description": item.get("description"),
"stars": item.get("stargazers_count"),
"primary_language": item.get("language"),
"topics": item.get("topics", []),
"url": item.get("html_url"),
}
for item in data.get("items", [])
]
result = {"total_count": data.get("total_count", 0), "projects": projects, "page": page}
cache_set(cache_key, result, ttl=60)
return result
@router.get("/api/project_rigor/{owner}/{repo}", response_model=RigorResult)
async def project_rigor(owner: str, repo: str, x_github_token: str = Header(None)):
validate_username(owner)
validate_param(repo, "repo")
res = await get_github_data(f"{GITHUB_API}/repos/{owner}/{repo}", user_token=x_github_token)
if res.status_code != 200:
return {"grade": "N/A", "raw_score": 0.0}
data = res.json()
score = min(100.0, data.get("stargazers_count", 0) * 0.5 + data.get("forks_count", 0) * 2.0)
grade = "🟢 A+" if score > 80 else ("🟡 B" if score > 40 else "🔴 C")
return {"grade": grade, "raw_score": round(score, 2)}
@router.get("/api/project_preview/{owner}/{repo}")
async def project_preview(owner: str, repo: str, x_github_token: str = Header(None)):
validate_username(owner)
validate_param(repo, "repo")
cached = cache_get(f"preview:{owner}/{repo}")
if cached:
return cached
res = await get_github_data(f"{GITHUB_API}/repos/{owner}/{repo}", user_token=x_github_token)
if res.status_code != 200:
raise HTTPException(status_code=502, detail="Could not fetch repo info.")
data = res.json()
homepage = (data.get("homepage") or "").strip()
has_pages = data.get("has_pages", False)
site_url = None
if homepage and homepage.startswith("http"):
site_url = homepage
elif has_pages:
site_url = f"https://{owner}.github.io/{repo}/"
result = {
"site_url": site_url,
"has_pages": has_pages,
"homepage": homepage or None,
"repo_url": data.get("html_url"),
"name": data.get("full_name"),
}
cache_set(f"preview:{owner}/{repo}", result, ttl=300)
return result
@router.get("/api/heatmap/{username}")
async def contribution_heatmap(username: str, x_github_token: str = Header(None)):
validate_username(username)
cached = cache_get(f"heatmap:{username}")
if cached:
return cached
res = await get_github_data(
f"{GITHUB_API}/users/{username}/events/public",
params={"per_page": 100},
user_token=x_github_token
)
if res.status_code != 200:
raise HTTPException(status_code=502, detail="Could not fetch events.")
events = res.json()
from collections import defaultdict
from datetime import datetime, timedelta
counts: dict = defaultdict(int)
for e in events:
if e.get("type") == "PushEvent":
day = (e.get("created_at") or "")[:10]
if day:
commits = len(e.get("payload", {}).get("commits", []))
counts[day] += max(commits, 1)
today = datetime.utcnow().date()
start = today - timedelta(weeks=52)
grid = []
d = start
while d <= today:
ds = d.isoformat()
grid.append({"date": ds, "count": counts.get(ds, 0)})
d += timedelta(days=1)
result = {"grid": grid, "total_contributions": sum(counts.values())}
cache_set(f"heatmap:{username}", result, ttl=300)
return result
@router.get("/api/market_trends")
async def market_trends(x_github_token: str = Header(None)):
cached = cache_get("trends_v2")
if cached:
return cached
categories = {
"languages": ["Python", "JavaScript", "TypeScript", "Go", "Rust", "Java", "C++", "Swift"],
"frameworks": ["React", "Next.js", "FastAPI", "Django", "Vue", "Flutter"],
"cloud": ["AWS", "Docker", "Kubernetes", "Terraform", "Azure", "GCP"],
"ml_ecosystem": ["PyTorch", "TensorFlow", "Hugging Face", "LangChain", "scikit-learn"]
}
trends = {cat: {} for cat in categories}
async def fetch_count(cat, topic):
try:
# Using broader search for more realistic "market presence" counts
res = await get_github_data(
f"{GITHUB_API}/search/repositories",
params={"q": f"{topic} in:readme,description", "per_page": 1},
user_token=x_github_token
)
if res.status_code == 200:
count = res.json().get("total_count", 0)
return cat, topic, count
except Exception:
pass
return cat, topic, 0
tasks = []
for cat, topics in categories.items():
for t in topics:
tasks.append(fetch_count(cat, t))
results = await asyncio.gather(*tasks)
for cat, topic, count in results:
trends[cat][topic] = count
result = {"trends": trends, "last_updated": asyncio.get_event_loop().time()}
cache_set("trends_v2", result, ttl=3600) # Cache for 1 hour
return result