Spaces:
Running
Running
| import libsql | |
| import os | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| from fastapi import FastAPI, Query, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from typing import Optional, List, Dict, Any | |
| from contextlib import asynccontextmanager | |
| from datetime import datetime | |
| # Global database connection | |
| db_conn = None | |
| async def lifespan(app: FastAPI): | |
| """Manage application lifecycle - initialize DB on startup.""" | |
| global db_conn | |
| url = os.getenv("DATABASE_URL") | |
| auth_token = os.getenv("API_KEY") | |
| if not url or not auth_token: | |
| raise RuntimeError("DATABASE_URL and API_KEY environment variables must be set") | |
| try: | |
| db_conn = libsql.connect( | |
| "zigistry-main.db", sync_url=url, auth_token=auth_token | |
| ) | |
| db_conn.sync() | |
| print("Database connection established successfully") | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to connect to database: {e}") | |
| yield | |
| # Cleanup on shutdown | |
| if db_conn: | |
| try: | |
| db_conn.close() | |
| except Exception as e: | |
| print(f"Error closing database connection: {e}") | |
| app = FastAPI( | |
| title="Zigistry API", | |
| description="API for searching and browsing Zig packages and programs", | |
| version="1.0.0", | |
| lifespan=lifespan, | |
| ) | |
| # CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def get_default_branch_info(conn, repo_id: str, default_branch: str) -> Dict[str, Any]: | |
| """Get information about the default branch (build.zig.zon file). | |
| Args: | |
| conn: Database connection | |
| repo_id: Repository identifier | |
| default_branch: Name of the default branch | |
| Returns: | |
| Dictionary with default branch information including dependencies and minimum zig version | |
| """ | |
| # For default branch, we'll get the latest release info as a proxy | |
| # In a real implementation, you'd fetch build.zig.zon from the default branch | |
| sql = """ | |
| SELECT | |
| id, | |
| version, | |
| published_at, | |
| minimum_zig_version, | |
| readme_url, | |
| is_prerelease | |
| FROM releases | |
| WHERE repo_id = ? | |
| ORDER BY published_at DESC | |
| LIMIT 1 | |
| """ | |
| cursor = conn.execute(sql, (repo_id,)) | |
| release = cursor.fetchone() | |
| if not release: | |
| return { | |
| "minimum_zig_version": "0.0.0", | |
| "dependencies": [], | |
| "branch_name": default_branch, | |
| } | |
| release_id, version, published_at, min_zig_ver, readme_url, is_prerelease = release | |
| # Get dependencies for this release | |
| deps_sql = """ | |
| SELECT name, url, hash, lazy, path | |
| FROM release_dependencies | |
| WHERE release_id = ? | |
| """ | |
| cursor = conn.execute(deps_sql, (release_id,)) | |
| dep_rows = cursor.fetchall() | |
| dependencies = [ | |
| { | |
| "name": row[0], | |
| "url": row[1], | |
| "hash": row[2], | |
| "lazy": bool(row[3]) if row[3] else False, | |
| "path": row[4], | |
| } | |
| for row in dep_rows | |
| ] | |
| return { | |
| "minimum_zig_version": min_zig_ver or "0.0.0", | |
| "dependencies": dependencies, | |
| "branch_name": default_branch, | |
| "latest_version": version, | |
| "readme_url": readme_url, | |
| } | |
| def get_version_info(conn, repo_id: str, version: str) -> Dict[str, Any]: | |
| """Get information about a specific version/release. | |
| Args: | |
| conn: Database connection | |
| repo_id: Repository identifier | |
| version: Version string | |
| Returns: | |
| Dictionary with version-specific information | |
| Raises: | |
| HTTPException: If version not found | |
| """ | |
| sql = """ | |
| SELECT | |
| id, | |
| version, | |
| published_at, | |
| minimum_zig_version, | |
| readme_url, | |
| is_prerelease | |
| FROM releases | |
| WHERE repo_id = ? AND version = ? | |
| """ | |
| cursor = conn.execute(sql, (repo_id, version)) | |
| release = cursor.fetchone() | |
| if not release: | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"Version '{version}' not found for repository '{repo_id}'", | |
| ) | |
| release_id, ver, published_at, min_zig_ver, readme_url, is_prerelease = release | |
| # Get dependencies for this version | |
| deps_sql = """ | |
| SELECT name, url, hash, lazy, path | |
| FROM release_dependencies | |
| WHERE release_id = ? | |
| """ | |
| cursor = conn.execute(deps_sql, (release_id,)) | |
| dep_rows = cursor.fetchall() | |
| dependencies = [ | |
| { | |
| "name": row[0], | |
| "url": row[1], | |
| "hash": row[2], | |
| "lazy": bool(row[3]) if row[3] else False, | |
| "path": row[4], | |
| } | |
| for row in dep_rows | |
| ] | |
| return { | |
| "version": ver, | |
| "published_at": str(published_at) if published_at else None, | |
| "minimum_zig_version": min_zig_ver or "0.0.0", | |
| "readme_url": readme_url, | |
| "is_prerelease": bool(is_prerelease), | |
| "dependencies": dependencies, | |
| } | |
| def row_to_repo_dict(row: tuple) -> Dict[str, Any]: | |
| """Convert database row to repository dictionary. | |
| Handles the common transformation from SQL result to API response format. | |
| """ | |
| platform_raw = (row[3] or "").lower() | |
| provider = ( | |
| "gh" | |
| if "github" in platform_raw | |
| else ("cb" if "codeberg" in platform_raw else "gh") | |
| ) | |
| repo_id = row[0] | |
| repo_name = repo_id.split("/")[-1] if "/" in repo_id else repo_id | |
| return { | |
| "id": row[0], | |
| "owner_name": row[2], | |
| "provider": provider, | |
| "repo_name": repo_name, | |
| "avatar_url": row[1], | |
| "owner": row[2], | |
| "platform": row[3], | |
| "description": row[4], | |
| "issues_count": row[5], | |
| "default_branch_name": row[6], | |
| "fork_count": row[7], | |
| "stargazer_count": row[8], | |
| "watchers_count": row[9], | |
| "pushed_at": str(row[10]) if row[10] else None, | |
| "created_at": str(row[11]) if row[11] else None, | |
| "is_archived": bool(row[12]), | |
| "is_disabled": bool(row[13]), | |
| "is_fork": bool(row[14]), | |
| "license": row[15], | |
| "primary_language": row[16], | |
| "minimum_zig_version": row[17] if row[17] else "0.0.0", | |
| "dependents_count": row[18] if row[18] is not None else 0, | |
| } | |
| def get_type_filter(search_type: str) -> str: | |
| """Generate SQL WHERE clause filter based on search type.""" | |
| if search_type == "package": | |
| return "AND pkg.repo_id IS NOT NULL" | |
| elif search_type == "program": | |
| return "AND prog.repo_id IS NOT NULL" | |
| else: # all | |
| return "AND (pkg.repo_id IS NOT NULL OR prog.repo_id IS NOT NULL)" | |
| def search_repos( | |
| conn, query: str, search_type: str = "all", limit: int = 50 | |
| ) -> List[Dict[str, Any]]: | |
| """Search repositories by query string. | |
| Args: | |
| conn: Database connection | |
| query: Search query string | |
| search_type: Filter by 'package', 'program', or 'all' | |
| limit: Maximum number of results to return | |
| Returns: | |
| List of repository dictionaries with default branch minimum_zig_version | |
| """ | |
| search_term_like = f"%{query}%" | |
| # FTS5 query with prefix matching for each word | |
| fts_query = " ".join( | |
| '"' + word.replace('"', '""') + '"*' for word in query.split() if word | |
| ) | |
| type_filter = get_type_filter(search_type) | |
| sql = f""" | |
| WITH matched_ids AS ( | |
| SELECT repo_id FROM repo_search WHERE keywords MATCH ?1 | |
| UNION | |
| SELECT id FROM repos WHERE id LIKE ?2 OR owner LIKE ?2 OR description LIKE ?2 OR primary_language LIKE ?2 | |
| ), | |
| repo_data AS ( | |
| SELECT | |
| r.id, | |
| r.avatar_id, | |
| r.owner, | |
| r.platform, | |
| r.description, | |
| r.issues_count, | |
| r.default_branch_name, | |
| r.fork_count, | |
| r.stargazer_count, | |
| r.watchers_count, | |
| r.pushed_at, | |
| r.created_at, | |
| r.is_archived, | |
| r.is_disabled, | |
| r.is_fork, | |
| r.license, | |
| r.primary_language, | |
| r.minimum_zig_version | |
| FROM repos r | |
| JOIN matched_ids mi ON r.id = mi.repo_id | |
| LEFT JOIN packages pkg ON r.id = pkg.repo_id | |
| LEFT JOIN programs prog ON r.id = prog.repo_id | |
| WHERE | |
| r.is_disabled = 0 | |
| {type_filter} | |
| ) | |
| SELECT | |
| rd.*, | |
| (SELECT COUNT(*) FROM repo_dependents WHERE repo_id = rd.id) as dependents_count | |
| FROM repo_data rd | |
| ORDER BY | |
| CASE | |
| WHEN rd.id = ?3 THEN 1 | |
| WHEN rd.id LIKE ?4 THEN 2 | |
| ELSE 3 | |
| END, | |
| rd.stargazer_count DESC | |
| LIMIT ?5 | |
| """ | |
| starts_with = f"{query}%" | |
| # Parameters matches ?1 (FTS), ?2 (LIKE), ?3 (Exact), ?4 (Starts with), ?5 (Limit) | |
| cursor = conn.execute(sql, (fts_query, search_term_like, query, starts_with, limit)) | |
| rows = cursor.fetchall() | |
| return [row_to_repo_dict(row) for row in rows] | |
| def get_latest_repos( | |
| conn, search_type: str = "all", limit: int = 10 | |
| ) -> List[Dict[str, Any]]: | |
| """Get latest repositories ordered by creation date. | |
| Args: | |
| conn: Database connection | |
| search_type: Filter by 'package', 'program', or 'all' | |
| limit: Maximum number of results to return | |
| Returns: | |
| List of repository dictionaries with default branch minimum_zig_version | |
| """ | |
| type_filter = get_type_filter(search_type) | |
| sql = f""" | |
| WITH repo_data AS ( | |
| SELECT | |
| r.id, | |
| r.avatar_id, | |
| r.owner, | |
| r.platform, | |
| r.description, | |
| r.issues_count, | |
| r.default_branch_name, | |
| r.fork_count, | |
| r.stargazer_count, | |
| r.watchers_count, | |
| r.pushed_at, | |
| r.created_at, | |
| r.is_archived, | |
| r.is_disabled, | |
| r.is_fork, | |
| r.license, | |
| r.primary_language, | |
| r.minimum_zig_version | |
| FROM repos r | |
| LEFT JOIN packages pkg ON r.id = pkg.repo_id | |
| LEFT JOIN programs prog ON r.id = prog.repo_id | |
| WHERE | |
| r.is_disabled = 0 | |
| {type_filter} | |
| ORDER BY r.created_at DESC | |
| LIMIT ?1 | |
| ) | |
| SELECT | |
| rd.*, | |
| (SELECT COUNT(*) FROM repo_dependents WHERE repo_id = rd.id) as dependents_count | |
| FROM repo_data rd | |
| """ | |
| cursor = conn.execute(sql, (limit,)) | |
| rows = cursor.fetchall() | |
| return [row_to_repo_dict(row) for row in rows] | |
| def get_scroll_repos( | |
| conn, search_type: str = "all", per_page: int = 20, page: int = 1 | |
| ) -> List[Dict[str, Any]]: | |
| """Get paginated repositories ordered by star count. | |
| Args: | |
| conn: Database connection | |
| search_type: Filter by 'package', 'program', or 'all' | |
| per_page: Number of items per page (max 20) | |
| page: Page number (1-indexed) | |
| Returns: | |
| List of repository dictionaries with default branch minimum_zig_version | |
| """ | |
| actual_per_page = min(per_page, 20) | |
| offset = (max(page, 1) - 1) * actual_per_page | |
| type_filter = get_type_filter(search_type) | |
| sql = f""" | |
| WITH repo_data AS ( | |
| SELECT | |
| r.id, | |
| r.avatar_id, | |
| r.owner, | |
| r.platform, | |
| r.description, | |
| r.issues_count, | |
| r.default_branch_name, | |
| r.fork_count, | |
| r.stargazer_count, | |
| r.watchers_count, | |
| r.pushed_at, | |
| r.created_at, | |
| r.is_archived, | |
| r.is_disabled, | |
| r.is_fork, | |
| r.license, | |
| r.primary_language, | |
| r.minimum_zig_version | |
| FROM repos r | |
| LEFT JOIN packages pkg ON r.id = pkg.repo_id | |
| LEFT JOIN programs prog ON r.id = prog.repo_id | |
| WHERE | |
| r.is_disabled = 0 | |
| {type_filter} | |
| ORDER BY r.stargazer_count DESC, r.id ASC | |
| LIMIT ?1 OFFSET ?2 | |
| ) | |
| SELECT | |
| rd.*, | |
| (SELECT COUNT(*) FROM repo_dependents WHERE repo_id = rd.id) as dependents_count | |
| FROM repo_data rd | |
| """ | |
| cursor = conn.execute(sql, (actual_per_page, offset)) | |
| rows = cursor.fetchall() | |
| return [row_to_repo_dict(row) for row in rows] | |
| def parse_repo_id(repo_id: str) -> tuple[str, str]: | |
| """Parse repo_id into owner and repo name. | |
| Args: | |
| repo_id: Repository ID in format 'owner/repo' or 'provider/owner/repo' | |
| Returns: | |
| Tuple of (owner_name, repo_name) | |
| Raises: | |
| HTTPException: If repo_id format is invalid | |
| """ | |
| parts = repo_id.split("/") | |
| if len(parts) == 3: | |
| return parts[1], parts[2] | |
| elif len(parts) == 2: | |
| return parts[0], parts[1] | |
| else: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid repo_id format. Expected 'provider/owner/repo' or 'owner/repo'", | |
| ) | |
| def get_repo_details( | |
| conn, repo_id: str, version: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """Get detailed information about a repository. | |
| Args: | |
| conn: Database connection | |
| repo_id: Repository identifier | |
| version: Optional specific version to fetch details for | |
| Returns: | |
| Dictionary with complete repository details including: | |
| - If version is None: default branch information with dependencies | |
| - If version is specified: that specific version's information with dependencies | |
| Raises: | |
| HTTPException: If repository or version not found | |
| """ | |
| owner_name, repo_name = parse_repo_id(repo_id) | |
| # Get repository details | |
| repo_sql = """ | |
| SELECT | |
| id, avatar_id, owner, platform, description, issues_count, | |
| default_branch_name, fork_count, stargazer_count, watchers_count, | |
| pushed_at, created_at, is_archived, is_disabled, is_fork, | |
| license, primary_language, minimum_zig_version | |
| FROM repos WHERE id = ? | |
| """ | |
| cursor = conn.execute(repo_sql, (repo_id,)) | |
| repo_row = cursor.fetchone() | |
| if not repo_row: | |
| raise HTTPException(status_code=404, detail="Repository not found") | |
| ( | |
| r_id, | |
| avatar_id, | |
| owner, | |
| platform, | |
| desc, | |
| issues, | |
| default_branch, | |
| forks, | |
| stars, | |
| watchers, | |
| pushed_at, | |
| created_at, | |
| is_archived, | |
| is_disabled, | |
| is_fork, | |
| license_spdx, | |
| primary_language, | |
| min_zig_ver, | |
| ) = repo_row | |
| # Map platform to provider | |
| platform_raw = (platform or "").lower() | |
| provider_id = ( | |
| "gh" | |
| if "github" in platform_raw | |
| else ("cb" if "codeberg" in platform_raw else "gh") | |
| ) | |
| # Get all releases for the releases list | |
| releases_sql = """ | |
| SELECT version | |
| FROM releases | |
| WHERE repo_id = ? | |
| ORDER BY published_at DESC | |
| """ | |
| cursor = conn.execute(releases_sql, (repo_id,)) | |
| releases_list = [row[0] for row in cursor.fetchall() if row[0]] | |
| # Get dependents (same for all versions) | |
| dependents_sql = "SELECT dependent FROM repo_dependents WHERE repo_id = ?" | |
| cursor = conn.execute(dependents_sql, (repo_id,)) | |
| dependents = [row[0] for row in cursor.fetchall()] | |
| # Build response with either version-specific or default branch info | |
| response = { | |
| "id": r_id, | |
| "avatar_id": avatar_id, | |
| "owner": owner, | |
| "platform": platform, | |
| "description": desc, | |
| "issues_count": issues, | |
| "default_branch_name": default_branch, | |
| "fork_count": forks, | |
| "stargazer_count": stars, | |
| "watchers_count": watchers, | |
| "pushed_at": str(pushed_at) if pushed_at else None, | |
| "created_at": str(created_at) if created_at else None, | |
| "is_archived": bool(is_archived), | |
| "is_disabled": bool(is_disabled), | |
| "is_fork": bool(is_fork), | |
| "license": license_spdx, | |
| "primary_language": primary_language, | |
| "provider_id": provider_id, | |
| "owner_name": owner_name, | |
| "repo_name": repo_name, | |
| "stars_count": stars, | |
| "forks_count": forks, | |
| "releases": releases_list, | |
| "dependents": dependents, | |
| } | |
| if version: | |
| # Get specific version info | |
| version_info = get_version_info(conn, repo_id, version) | |
| response.update( | |
| { | |
| "version": version_info["version"], | |
| "published_at": version_info["published_at"], | |
| "minimum_zig_version": version_info["minimum_zig_version"], | |
| "readme_url": version_info["readme_url"], | |
| "is_prerelease": version_info["is_prerelease"], | |
| "dependencies": version_info["dependencies"], | |
| "requested_version": version, | |
| } | |
| ) | |
| else: | |
| # Get default branch info | |
| branch_info = get_default_branch_info(conn, repo_id, default_branch) | |
| response.update( | |
| { | |
| "minimum_zig_version": min_zig_ver | |
| or branch_info["minimum_zig_version"], | |
| "dependencies": branch_info["dependencies"], | |
| "branch_name": branch_info["branch_name"], | |
| "latest_version": branch_info.get("latest_version"), | |
| "readme_url": branch_info.get("readme_url"), | |
| "version": None, # Indicates default branch, not a specific version | |
| } | |
| ) | |
| return response | |
| def check_db_connection(): | |
| """Verify database connection is available.""" | |
| if not db_conn: | |
| raise HTTPException(status_code=503, detail="Database not initialized") | |
| # API Endpoints | |
| async def search_packages_endpoint( | |
| q: str = Query(..., min_length=1, description="Search query"), | |
| limit: int = Query(50, ge=1, le=100, description="Maximum results"), | |
| ): | |
| """Search for Zig packages. Returns default branch minimum_zig_version for each result.""" | |
| check_db_connection() | |
| return search_repos(db_conn, q, search_type="package", limit=limit) | |
| async def search_programs_endpoint( | |
| q: str = Query(..., min_length=1, description="Search query"), | |
| limit: int = Query(50, ge=1, le=100, description="Maximum results"), | |
| ): | |
| """Search for Zig programs. Returns default branch minimum_zig_version for each result.""" | |
| check_db_connection() | |
| return search_repos(db_conn, q, search_type="program", limit=limit) | |
| async def get_latest_packages_endpoint( | |
| limit: int = Query(10, ge=1, le=50, description="Number of packages"), | |
| ): | |
| """Get latest Zig packages. Returns default branch minimum_zig_version for each result.""" | |
| check_db_connection() | |
| return get_latest_repos(db_conn, search_type="package", limit=limit) | |
| async def get_latest_programs_endpoint( | |
| limit: int = Query(10, ge=1, le=50, description="Number of programs"), | |
| ): | |
| """Get latest Zig programs. Returns default branch minimum_zig_version for each result.""" | |
| check_db_connection() | |
| return get_latest_repos(db_conn, search_type="program", limit=limit) | |
| async def scroll_packages_endpoint( | |
| per_page: int = Query(20, ge=1, le=20, description="Items per page"), | |
| page: int = Query(1, ge=1, description="Page number"), | |
| ): | |
| """Get paginated list of packages sorted by stars. Returns default branch minimum_zig_version for each result.""" | |
| check_db_connection() | |
| return get_scroll_repos( | |
| db_conn, search_type="package", per_page=per_page, page=page | |
| ) | |
| async def scroll_programs_endpoint( | |
| per_page: int = Query(20, ge=1, le=20, description="Items per page"), | |
| page: int = Query(1, ge=1, description="Page number"), | |
| ): | |
| """Get paginated list of programs sorted by stars. Returns default branch minimum_zig_version for each result.""" | |
| check_db_connection() | |
| return get_scroll_repos( | |
| db_conn, search_type="program", per_page=per_page, page=page | |
| ) | |
| async def get_package_details_endpoint( | |
| q: str = Query( | |
| ..., alias="q", description="Repository ID (owner/repo or provider/owner/repo)" | |
| ), | |
| version: Optional[str] = Query( | |
| None, description="Specific version to fetch (omit for default branch)" | |
| ), | |
| ): | |
| """ | |
| Get detailed information about a package. | |
| - Without version parameter: Returns default branch information with dependencies | |
| - With version parameter: Returns that specific version's information with dependencies | |
| Examples: | |
| - /packages?q=gh/rohanvashisht1234/zorsig (default branch) | |
| - /packages?q=gh/rohanvashisht1234/zorsig&version=0.0.1 (specific version) | |
| """ | |
| check_db_connection() | |
| return get_repo_details(db_conn, q, version) | |
| async def get_program_details_endpoint( | |
| q: str = Query( | |
| ..., alias="q", description="Repository ID (owner/repo or provider/owner/repo)" | |
| ), | |
| version: Optional[str] = Query( | |
| None, description="Specific version to fetch (omit for default branch)" | |
| ), | |
| ): | |
| """ | |
| Get detailed information about a program. | |
| - Without version parameter: Returns default branch information with dependencies | |
| - With version parameter: Returns that specific version's information with dependencies | |
| Examples: | |
| - /programs?q=gh/owner/program (default branch) | |
| - /programs?q=gh/owner/program&version=1.0.0 (specific version) | |
| """ | |
| check_db_connection() | |
| return get_repo_details(db_conn, q, version) | |
| async def health_check(): | |
| """Check API health status.""" | |
| return {"status": "healthy", "database": "connected" if db_conn else "disconnected"} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |