Spaces:
Running
Running
| import libsql | |
| import os | |
| import asyncio | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| from fastapi import FastAPI, Query, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from typing import Optional, List, Dict, Any | |
| from contextlib import asynccontextmanager | |
| from datetime import datetime | |
| # Global database connection | |
| db_conn = None | |
| async def sync_periodically(interval_seconds: int = 3600): | |
| """Sync the database every `interval_seconds`.""" | |
| while True: | |
| await asyncio.sleep(interval_seconds) | |
| try: | |
| db_conn.sync() | |
| print(f"Database synced at {datetime.now()}") | |
| except Exception as e: | |
| print(f"Sync error: {e}") | |
| async def lifespan(app: FastAPI): | |
| """Manage application lifecycle - initialize DB on startup.""" | |
| global db_conn | |
| url = os.getenv("DATABASE_URL") | |
| auth_token = os.getenv("API_KEY") | |
| if not url or not auth_token: | |
| raise RuntimeError("DATABASE_URL and API_KEY environment variables must be set") | |
| try: | |
| db_conn = libsql.connect( | |
| "zigistry-main.db", sync_url=url, auth_token=auth_token | |
| ) | |
| db_conn.sync() | |
| print("Database connection established successfully") | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to connect to database: {e}") | |
| sync_task = asyncio.create_task(sync_periodically(3600)) | |
| yield | |
| sync_task.cancel() | |
| try: | |
| await sync_task | |
| except asyncio.CancelledError: | |
| pass | |
| # Cleanup on shutdown | |
| if db_conn: | |
| try: | |
| db_conn.close() | |
| except Exception as e: | |
| print(f"Error closing database connection: {e}") | |
| app = FastAPI( | |
| title="Zigistry API", | |
| description="API for searching and browsing Zig packages and programs", | |
| version="1.0.0", | |
| lifespan=lifespan, | |
| ) | |
| # CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # SQL fragment reused across all list/search queries. | |
| # Selects every column row_to_repo_dict() expects, in the correct order: | |
| # 0 id | |
| # 1 avatar_id ← joined from users (repos has no avatar_id column) | |
| # 2 owner | |
| # 3 platform | |
| # 4 description | |
| # 5 issues_count | |
| # 6 default_branch_name | |
| # 7 fork_count | |
| # 8 stargazer_count | |
| # 9 watchers_count | |
| # 10 pushed_at | |
| # 11 created_at | |
| # 12 is_archived | |
| # 13 is_disabled | |
| # 14 is_fork | |
| # 15 license | |
| # 16 primary_language | |
| # 17 minimum_zig_version ← subquery from releases (repos has no such column) | |
| # --------------------------------------------------------------------------- | |
| _REPO_COLUMNS = """ | |
| r.id, | |
| u.avatar_id, | |
| r.owner, | |
| r.platform, | |
| r.description, | |
| r.issues_count, | |
| r.default_branch_name, | |
| r.fork_count, | |
| r.stargazer_count, | |
| r.watchers_count, | |
| r.pushed_at, | |
| r.created_at, | |
| r.is_archived, | |
| r.is_disabled, | |
| r.is_fork, | |
| r.license, | |
| r.primary_language, | |
| ( | |
| SELECT minimum_zig_version | |
| FROM releases | |
| WHERE repo_id = r.id | |
| ORDER BY published_at DESC | |
| LIMIT 1 | |
| ) AS minimum_zig_version | |
| """ | |
| _REPO_JOIN_USERS = "LEFT JOIN users u ON r.owner = u.id" | |
| def get_default_branch_info(conn, repo_id: str, default_branch: str) -> Dict[str, Any]: | |
| """Get information about the default branch (build.zig.zon file). | |
| Uses the most recent release as a proxy for the default branch state. | |
| """ | |
| sql = """ | |
| SELECT | |
| id, | |
| version, | |
| published_at, | |
| minimum_zig_version, | |
| readme_url, | |
| is_prerelease | |
| FROM releases | |
| WHERE repo_id = ? | |
| ORDER BY published_at DESC | |
| LIMIT 1 | |
| """ | |
| cursor = conn.execute(sql, (repo_id,)) | |
| release = cursor.fetchone() | |
| if not release: | |
| return { | |
| "minimum_zig_version": "0.0.0", | |
| "dependencies": [], | |
| "branch_name": default_branch, | |
| } | |
| release_id, version, published_at, min_zig_ver, readme_url, is_prerelease = release | |
| dependencies = _fetch_dependencies(conn, release_id) | |
| return { | |
| "minimum_zig_version": min_zig_ver or "0.0.0", | |
| "dependencies": dependencies, | |
| "branch_name": default_branch, | |
| "latest_version": version, | |
| "readme_url": readme_url, | |
| } | |
| def get_version_info(conn, repo_id: str, version: str) -> Dict[str, Any]: | |
| """Get information about a specific version/release. | |
| Raises: | |
| HTTPException: If version not found. | |
| """ | |
| sql = """ | |
| SELECT | |
| id, | |
| version, | |
| published_at, | |
| minimum_zig_version, | |
| readme_url, | |
| is_prerelease | |
| FROM releases | |
| WHERE repo_id = ? AND version = ? | |
| """ | |
| cursor = conn.execute(sql, (repo_id, version)) | |
| release = cursor.fetchone() | |
| if not release: | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"Version '{version}' not found for repository '{repo_id}'", | |
| ) | |
| release_id, ver, published_at, min_zig_ver, readme_url, is_prerelease = release | |
| dependencies = _fetch_dependencies(conn, release_id) | |
| return { | |
| "version": ver, | |
| "published_at": str(published_at) if published_at else None, | |
| "minimum_zig_version": min_zig_ver or "0.0.0", | |
| "readme_url": readme_url, | |
| "is_prerelease": bool(is_prerelease), | |
| "dependencies": dependencies, | |
| } | |
| def _fetch_dependencies(conn, release_id: int) -> List[Dict[str, Any]]: | |
| """Return all dependencies for a given release id. | |
| Note: the schema column is `is_lazy`, not `lazy`. | |
| """ | |
| sql = """ | |
| SELECT name, url, hash, is_lazy, path | |
| FROM release_dependencies | |
| WHERE release_id = ? | |
| """ | |
| cursor = conn.execute(sql, (release_id,)) | |
| return [ | |
| { | |
| "name": row[0], | |
| "url": row[1], | |
| "hash": row[2], | |
| "lazy": bool(row[3]) if row[3] is not None else False, | |
| "path": row[4], | |
| } | |
| for row in cursor.fetchall() | |
| ] | |
| def row_to_repo_dict(row: tuple) -> Dict[str, Any]: | |
| """Convert a database row to a repository dictionary. | |
| Expected column order (see _REPO_COLUMNS): | |
| 0 id | |
| 1 avatar_id | |
| 2 owner | |
| 3 platform | |
| 4 description | |
| 5 issues_count | |
| 6 default_branch_name | |
| 7 fork_count | |
| 8 stargazer_count | |
| 9 watchers_count | |
| 10 pushed_at | |
| 11 created_at | |
| 12 is_archived | |
| 13 is_disabled | |
| 14 is_fork | |
| 15 license | |
| 16 primary_language | |
| 17 minimum_zig_version | |
| 18 dependents_count | |
| """ | |
| platform_raw = (row[3] or "").lower() | |
| provider = ( | |
| "gh" | |
| if "github" in platform_raw | |
| else ("cb" if "codeberg" in platform_raw else "gh") | |
| ) | |
| repo_id = row[0] | |
| repo_name = repo_id.split("/")[-1] if "/" in repo_id else repo_id | |
| return { | |
| "id": row[0], | |
| "owner_name": row[2], | |
| "provider": provider, | |
| "repo_name": repo_name, | |
| "avatar_url": row[1], | |
| "owner": row[2], | |
| "platform": row[3], | |
| "description": row[4], | |
| "issues_count": row[5], | |
| "default_branch_name": row[6], | |
| "fork_count": row[7], | |
| "stargazer_count": row[8], | |
| "watchers_count": row[9], | |
| "pushed_at": str(row[10]) if row[10] else None, | |
| "created_at": str(row[11]) if row[11] else None, | |
| "is_archived": bool(row[12]), | |
| "is_disabled": bool(row[13]), | |
| "is_fork": bool(row[14]), | |
| "license": row[15], | |
| "primary_language": row[16], | |
| "minimum_zig_version": row[17] if row[17] else "0.0.0", | |
| "dependents_count": row[18] if row[18] is not None else 0, | |
| } | |
| def get_type_filter(search_type: str) -> str: | |
| """Generate SQL WHERE clause fragment based on search type.""" | |
| if search_type == "package": | |
| return "AND pkg.repo_id IS NOT NULL" | |
| elif search_type == "program": | |
| return "AND prog.repo_id IS NOT NULL" | |
| else: # all | |
| return "AND (pkg.repo_id IS NOT NULL OR prog.repo_id IS NOT NULL)" | |
| def search_repos( | |
| conn, query: str, search_type: str = "all", per_page: int = 15, page: int = 1 | |
| ) -> Dict[str, Any]: | |
| """Search repositories by query string with pagination.""" | |
| actual_per_page = min(max(per_page, 1), 15) | |
| requested_page = max(page, 1) | |
| search_term_like = f"%{query}%" | |
| fts_query = " ".join( | |
| '"' + word.replace('"', '""') + '"*' for word in query.split() if word | |
| ) | |
| type_filter = get_type_filter(search_type) | |
| count_sql = f""" | |
| WITH matched_ids AS ( | |
| SELECT repo_id FROM repo_search WHERE keywords MATCH ?1 | |
| UNION | |
| SELECT id FROM repos | |
| WHERE id LIKE ?2 OR owner LIKE ?2 OR description LIKE ?2 OR primary_language LIKE ?2 | |
| ), | |
| filtered_repos AS ( | |
| SELECT DISTINCT r.id | |
| FROM repos r | |
| JOIN matched_ids mi ON r.id = mi.repo_id | |
| LEFT JOIN packages pkg ON r.id = pkg.repo_id | |
| LEFT JOIN programs prog ON r.id = prog.repo_id | |
| WHERE | |
| r.is_disabled = 0 | |
| {type_filter} | |
| ) | |
| SELECT COUNT(*) FROM filtered_repos | |
| """ | |
| total_count = conn.execute(count_sql, (fts_query, search_term_like)).fetchone()[0] or 0 | |
| total_pages = ( | |
| (total_count + actual_per_page - 1) // actual_per_page if total_count > 0 else 0 | |
| ) | |
| current_page = min(requested_page, total_pages) if total_pages > 0 else 1 | |
| offset = (current_page - 1) * actual_per_page | |
| sql = f""" | |
| WITH matched_ids AS ( | |
| SELECT repo_id FROM repo_search WHERE keywords MATCH ?1 | |
| UNION | |
| SELECT id FROM repos | |
| WHERE id LIKE ?2 OR owner LIKE ?2 OR description LIKE ?2 OR primary_language LIKE ?2 | |
| ), | |
| filtered_repos AS ( | |
| SELECT DISTINCT r.id | |
| FROM repos r | |
| JOIN matched_ids mi ON r.id = mi.repo_id | |
| LEFT JOIN packages pkg ON r.id = pkg.repo_id | |
| LEFT JOIN programs prog ON r.id = prog.repo_id | |
| WHERE | |
| r.is_disabled = 0 | |
| {type_filter} | |
| ), | |
| repo_data AS ( | |
| SELECT | |
| {_REPO_COLUMNS} | |
| FROM repos r | |
| {_REPO_JOIN_USERS} | |
| JOIN filtered_repos fr ON r.id = fr.id | |
| ) | |
| SELECT | |
| rd.*, | |
| (SELECT COUNT(*) FROM repo_dependents WHERE repo_id = rd.id) AS dependents_count | |
| FROM repo_data rd | |
| ORDER BY | |
| CASE | |
| WHEN rd.id = ?3 THEN 1 | |
| WHEN rd.id LIKE ?4 THEN 2 | |
| ELSE 3 | |
| END, | |
| rd.stargazer_count DESC | |
| LIMIT ?5 OFFSET ?6 | |
| """ | |
| starts_with = f"{query}%" | |
| cursor = conn.execute( | |
| sql, (fts_query, search_term_like, query, starts_with, actual_per_page, offset) | |
| ) | |
| rows = cursor.fetchall() | |
| items = [row_to_repo_dict(row) for row in rows] | |
| return { | |
| "items": items, | |
| "total": total_count, | |
| "page": current_page, | |
| "per_page": actual_per_page, | |
| "total_pages": total_pages, | |
| "has_previous_page": current_page > 1, | |
| "has_next_page": current_page < total_pages, | |
| } | |
| def get_latest_repos( | |
| conn, search_type: str = "all", limit: int = 10 | |
| ) -> List[Dict[str, Any]]: | |
| """Get latest repositories ordered by creation date.""" | |
| type_filter = get_type_filter(search_type) | |
| sql = f""" | |
| WITH repo_data AS ( | |
| SELECT | |
| {_REPO_COLUMNS} | |
| FROM repos r | |
| {_REPO_JOIN_USERS} | |
| LEFT JOIN packages pkg ON r.id = pkg.repo_id | |
| LEFT JOIN programs prog ON r.id = prog.repo_id | |
| WHERE | |
| r.is_disabled = 0 | |
| {type_filter} | |
| ORDER BY r.created_at DESC | |
| LIMIT ?1 | |
| ) | |
| SELECT | |
| rd.*, | |
| (SELECT COUNT(*) FROM repo_dependents WHERE repo_id = rd.id) AS dependents_count | |
| FROM repo_data rd | |
| """ | |
| cursor = conn.execute(sql, (limit,)) | |
| rows = cursor.fetchall() | |
| return [row_to_repo_dict(row) for row in rows] | |
| def get_scroll_repos( | |
| conn, search_type: str = "all", per_page: int = 20, page: int = 1 | |
| ) -> List[Dict[str, Any]]: | |
| """Get paginated repositories ordered by star count.""" | |
| actual_per_page = min(per_page, 20) | |
| offset = (max(page, 1) - 1) * actual_per_page | |
| type_filter = get_type_filter(search_type) | |
| sql = f""" | |
| WITH repo_data AS ( | |
| SELECT | |
| {_REPO_COLUMNS} | |
| FROM repos r | |
| {_REPO_JOIN_USERS} | |
| LEFT JOIN packages pkg ON r.id = pkg.repo_id | |
| LEFT JOIN programs prog ON r.id = prog.repo_id | |
| WHERE | |
| r.is_disabled = 0 | |
| {type_filter} | |
| ORDER BY r.stargazer_count DESC, r.id ASC | |
| LIMIT ?1 OFFSET ?2 | |
| ) | |
| SELECT | |
| rd.*, | |
| (SELECT COUNT(*) FROM repo_dependents WHERE repo_id = rd.id) AS dependents_count | |
| FROM repo_data rd | |
| """ | |
| cursor = conn.execute(sql, (actual_per_page, offset)) | |
| rows = cursor.fetchall() | |
| return [row_to_repo_dict(row) for row in rows] | |
| def get_section_repos(conn, section_name: str, limit: int = 50) -> List[Dict[str, Any]]: | |
| """Get repositories for a specific index section.""" | |
| sql = f""" | |
| WITH section_repos AS ( | |
| SELECT repo_id FROM index_sections WHERE section_name = ?1 | |
| ), | |
| repo_data AS ( | |
| SELECT | |
| {_REPO_COLUMNS} | |
| FROM repos r | |
| {_REPO_JOIN_USERS} | |
| JOIN section_repos sr ON r.id = sr.repo_id | |
| LEFT JOIN packages pkg ON r.id = pkg.repo_id | |
| LEFT JOIN programs prog ON r.id = prog.repo_id | |
| WHERE | |
| r.is_disabled = 0 | |
| AND (pkg.repo_id IS NOT NULL OR prog.repo_id IS NOT NULL) | |
| LIMIT ?2 | |
| ) | |
| SELECT | |
| rd.*, | |
| (SELECT COUNT(*) FROM repo_dependents WHERE repo_id = rd.id) AS dependents_count | |
| FROM repo_data rd | |
| """ | |
| cursor = conn.execute(sql, (section_name, limit)) | |
| rows = cursor.fetchall() | |
| return [row_to_repo_dict(row) for row in rows] | |
| def parse_repo_id(repo_id: str) -> tuple[str, str]: | |
| """Parse repo_id into owner and repo name. | |
| Raises: | |
| HTTPException: If repo_id format is invalid. | |
| """ | |
| parts = repo_id.split("/") | |
| if len(parts) == 3: | |
| return parts[1], parts[2] | |
| elif len(parts) == 2: | |
| return parts[0], parts[1] | |
| else: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid repo_id format. Expected 'provider/owner/repo' or 'owner/repo'", | |
| ) | |
| def get_repo_details( | |
| conn, repo_id: str, version: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """Get detailed information about a repository. | |
| - version=None → default branch info (latest release used as proxy) | |
| - version=<tag> → that specific release's info | |
| Raises: | |
| HTTPException: If repository or version not found. | |
| """ | |
| owner_name, repo_name = parse_repo_id(repo_id) | |
| # avatar_id is on users, not repos; minimum_zig_version comes from releases. | |
| repo_sql = f""" | |
| SELECT | |
| r.id, | |
| u.avatar_id, | |
| r.owner, | |
| r.platform, | |
| r.description, | |
| r.issues_count, | |
| r.default_branch_name, | |
| r.fork_count, | |
| r.stargazer_count, | |
| r.watchers_count, | |
| r.pushed_at, | |
| r.created_at, | |
| r.is_archived, | |
| r.is_disabled, | |
| r.is_fork, | |
| r.license, | |
| r.primary_language | |
| FROM repos r | |
| {_REPO_JOIN_USERS} | |
| WHERE r.id = ? | |
| """ | |
| cursor = conn.execute(repo_sql, (repo_id,)) | |
| repo_row = cursor.fetchone() | |
| if not repo_row: | |
| raise HTTPException(status_code=404, detail="Repository not found") | |
| ( | |
| r_id, | |
| avatar_id, | |
| owner, | |
| platform, | |
| desc, | |
| issues, | |
| default_branch, | |
| forks, | |
| stars, | |
| watchers, | |
| pushed_at, | |
| created_at, | |
| is_archived, | |
| is_disabled, | |
| is_fork, | |
| license_spdx, | |
| primary_language, | |
| ) = repo_row | |
| platform_raw = (platform or "").lower() | |
| provider_id = ( | |
| "gh" | |
| if "github" in platform_raw | |
| else ("cb" if "codeberg" in platform_raw else "gh") | |
| ) | |
| # All releases for this repo | |
| releases_sql = """ | |
| SELECT version | |
| FROM releases | |
| WHERE repo_id = ? | |
| ORDER BY published_at DESC | |
| """ | |
| cursor = conn.execute(releases_sql, (repo_id,)) | |
| releases_list = [row[0] for row in cursor.fetchall() if row[0]] | |
| # Dependents (version-agnostic) | |
| cursor = conn.execute( | |
| "SELECT dependent FROM repo_dependents WHERE repo_id = ?", (repo_id,) | |
| ) | |
| dependents = [row[0] for row in cursor.fetchall()] | |
| response = { | |
| "id": r_id, | |
| "avatar_id": avatar_id, | |
| "owner": owner, | |
| "platform": platform, | |
| "description": desc, | |
| "issues_count": issues, | |
| "default_branch_name": default_branch, | |
| "fork_count": forks, | |
| "stargazer_count": stars, | |
| "watchers_count": watchers, | |
| "pushed_at": str(pushed_at) if pushed_at else None, | |
| "created_at": str(created_at) if created_at else None, | |
| "is_archived": bool(is_archived), | |
| "is_disabled": bool(is_disabled), | |
| "is_fork": bool(is_fork), | |
| "license": license_spdx, | |
| "primary_language": primary_language, | |
| "provider_id": provider_id, | |
| "owner_name": owner_name, | |
| "repo_name": repo_name, | |
| "stars_count": stars, | |
| "forks_count": forks, | |
| "releases": releases_list, | |
| "dependents": dependents, | |
| } | |
| if version: | |
| version_info = get_version_info(conn, repo_id, version) | |
| response.update( | |
| { | |
| "version": version_info["version"], | |
| "published_at": version_info["published_at"], | |
| "minimum_zig_version": version_info["minimum_zig_version"], | |
| "readme_url": version_info["readme_url"], | |
| "is_prerelease": version_info["is_prerelease"], | |
| "dependencies": version_info["dependencies"], | |
| "requested_version": version, | |
| } | |
| ) | |
| else: | |
| branch_info = get_default_branch_info(conn, repo_id, default_branch) | |
| response.update( | |
| { | |
| "minimum_zig_version": branch_info["minimum_zig_version"], | |
| "dependencies": branch_info["dependencies"], | |
| "branch_name": branch_info["branch_name"], | |
| "latest_version": branch_info.get("latest_version"), | |
| "readme_url": branch_info.get("readme_url"), | |
| "version": None, # Indicates default branch, not a specific version | |
| } | |
| ) | |
| return response | |
| def get_user_details(conn, user_id: str) -> Dict[str, Any]: | |
| """Get detailed information about a user and their repositories.""" | |
| cursor = conn.execute( | |
| "SELECT id, avatar_id, platform, bio FROM users WHERE id = ?", (user_id,) | |
| ) | |
| user_row = cursor.fetchone() | |
| if not user_row: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| user_data = { | |
| "id": user_row[0], | |
| "avatar_id": user_row[1], | |
| "platform": user_row[2], | |
| "bio": user_row[3], | |
| } | |
| # avatar_id must come from users; minimum_zig_version from releases subquery. | |
| repos_sql = f""" | |
| SELECT | |
| {_REPO_COLUMNS}, | |
| (SELECT COUNT(*) FROM repo_dependents WHERE repo_id = r.id) AS dependents_count, | |
| pkg.repo_id AS is_package, | |
| prog.repo_id AS is_program | |
| FROM repos r | |
| {_REPO_JOIN_USERS} | |
| LEFT JOIN packages pkg ON r.id = pkg.repo_id | |
| LEFT JOIN programs prog ON r.id = prog.repo_id | |
| WHERE r.owner = ? AND r.is_disabled = 0 | |
| ORDER BY r.stargazer_count DESC | |
| """ | |
| cursor = conn.execute(repos_sql, (user_id,)) | |
| rows = cursor.fetchall() | |
| packages = [] | |
| programs = [] | |
| for row in rows: | |
| # Columns 0-18: the standard repo columns + dependents_count | |
| repo_dict = row_to_repo_dict(row[:19]) | |
| is_package = row[19] | |
| is_program = row[20] | |
| if is_package: | |
| packages.append(repo_dict) | |
| if is_program: | |
| programs.append(repo_dict) | |
| user_data["packages"] = packages | |
| user_data["programs"] = programs | |
| return user_data | |
| def check_db_connection(): | |
| """Verify database connection is available.""" | |
| if not db_conn: | |
| raise HTTPException(status_code=503, detail="Database not initialized") | |
| # --------------------------------------------------------------------------- | |
| # API Endpoints | |
| # --------------------------------------------------------------------------- | |
| async def search_packages_endpoint( | |
| q: str = Query(..., min_length=1, description="Search query"), | |
| per_page: int = Query(15, ge=1, le=15, description="Items per page"), | |
| page: int = Query(1, ge=1, description="Page number"), | |
| ): | |
| """Search for Zig packages with pagination.""" | |
| check_db_connection() | |
| return search_repos(db_conn, q, search_type="package", per_page=per_page, page=page) | |
| async def search_programs_endpoint( | |
| q: str = Query(..., min_length=1, description="Search query"), | |
| per_page: int = Query(15, ge=1, le=15, description="Items per page"), | |
| page: int = Query(1, ge=1, description="Page number"), | |
| ): | |
| """Search for Zig programs with pagination.""" | |
| check_db_connection() | |
| return search_repos(db_conn, q, search_type="program", per_page=per_page, page=page) | |
| async def get_package_index_details_endpoint(): | |
| """ | |
| All data required for the package index page. | |
| Includes top 10 latest packages, top 10 most-starred packages, and curated sections. | |
| """ | |
| check_db_connection() | |
| return { | |
| "latest": get_latest_repos(db_conn, search_type="package", limit=10), | |
| "most_used": get_scroll_repos(db_conn, search_type="package", per_page=10, page=1), | |
| "games": get_section_repos(db_conn, "games", limit=50), | |
| "gui": get_section_repos(db_conn, "gui", limit=50), | |
| "web": get_section_repos(db_conn, "web", limit=50), | |
| } | |
| async def get_program_index_details_endpoint(): | |
| """ | |
| All data required for the program index page. | |
| Includes top 10 latest programs and top 10 most-starred programs. | |
| """ | |
| check_db_connection() | |
| return { | |
| "latest": get_latest_repos(db_conn, search_type="program", limit=10), | |
| "most_used": get_scroll_repos(db_conn, search_type="program", per_page=10, page=1), | |
| } | |
| async def scroll_packages_endpoint( | |
| per_page: int = Query(20, ge=1, le=20, description="Items per page"), | |
| page: int = Query(1, ge=1, description="Page number"), | |
| ): | |
| """Paginated list of packages sorted by stars.""" | |
| check_db_connection() | |
| return get_scroll_repos(db_conn, search_type="package", per_page=per_page, page=page) | |
| async def scroll_programs_endpoint( | |
| per_page: int = Query(20, ge=1, le=20, description="Items per page"), | |
| page: int = Query(1, ge=1, description="Page number"), | |
| ): | |
| """Paginated list of programs sorted by stars.""" | |
| check_db_connection() | |
| return get_scroll_repos(db_conn, search_type="program", per_page=per_page, page=page) | |
| async def get_user_details_endpoint( | |
| q: str = Query(..., description="User ID (e.g., gh/username or cb/username)") | |
| ): | |
| """ | |
| Detailed information about a user, including their packages and programs. | |
| """ | |
| check_db_connection() | |
| return get_user_details(db_conn, q) | |
| async def get_package_details_endpoint( | |
| q: str = Query( | |
| ..., alias="q", description="Repository ID (owner/repo or provider/owner/repo)" | |
| ), | |
| version: Optional[str] = Query( | |
| None, description="Specific version to fetch (omit for default branch)" | |
| ), | |
| ): | |
| """ | |
| Detailed information about a package. | |
| - Without version: returns default branch info (latest release used as proxy) | |
| - With version: returns that specific release's info | |
| Examples: | |
| - /packages?q=gh/rohanvashisht1234/zorsig | |
| - /packages?q=gh/rohanvashisht1234/zorsig&version=0.0.1 | |
| """ | |
| check_db_connection() | |
| return get_repo_details(db_conn, q, version) | |
| async def get_program_details_endpoint( | |
| q: str = Query( | |
| ..., alias="q", description="Repository ID (owner/repo or provider/owner/repo)" | |
| ), | |
| version: Optional[str] = Query( | |
| None, description="Specific version to fetch (omit for default branch)" | |
| ), | |
| ): | |
| """ | |
| Detailed information about a program. | |
| - Without version: returns default branch info | |
| - With version: returns that specific release's info | |
| Examples: | |
| - /programs?q=gh/owner/program | |
| - /programs?q=gh/owner/program&version=1.0.0 | |
| """ | |
| check_db_connection() | |
| return get_repo_details(db_conn, q, version) | |
| async def health_check(): | |
| """Check API health status.""" | |
| return {"status": "healthy", "database": "connected" if db_conn else "disconnected"} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |