import libsql import os import asyncio from dotenv import load_dotenv load_dotenv() from fastapi import FastAPI, Query, HTTPException from fastapi.middleware.cors import CORSMiddleware from typing import Optional, List, Dict, Any from contextlib import asynccontextmanager from datetime import datetime # Global database connection db_conn = None async def sync_periodically(interval_seconds: int = 3600): """Sync the database every `interval_seconds`.""" while True: await asyncio.sleep(interval_seconds) try: db_conn.sync() print(f"Database synced at {datetime.now()}") except Exception as e: print(f"Sync error: {e}") @asynccontextmanager async def lifespan(app: FastAPI): """Manage application lifecycle - initialize DB on startup.""" global db_conn url = os.getenv("DATABASE_URL") auth_token = os.getenv("API_KEY") if not url or not auth_token: raise RuntimeError("DATABASE_URL and API_KEY environment variables must be set") try: db_conn = libsql.connect( "zigistry-main.db", sync_url=url, auth_token=auth_token ) db_conn.sync() print("Database connection established successfully") except Exception as e: raise RuntimeError(f"Failed to connect to database: {e}") sync_task = asyncio.create_task(sync_periodically(3600)) yield sync_task.cancel() try: await sync_task except asyncio.CancelledError: pass # Cleanup on shutdown if db_conn: try: db_conn.close() except Exception as e: print(f"Error closing database connection: {e}") app = FastAPI( title="Zigistry API", description="API for searching and browsing Zig packages and programs", version="1.0.0", lifespan=lifespan, ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # --------------------------------------------------------------------------- # SQL fragment reused across all list/search queries. # Selects every column row_to_repo_dict() expects, in the correct order: # 0 id # 1 avatar_id ← joined from users (repos has no avatar_id column) # 2 owner # 3 platform # 4 description # 5 issues_count # 6 default_branch_name # 7 fork_count # 8 stargazer_count # 9 watchers_count # 10 pushed_at # 11 created_at # 12 is_archived # 13 is_disabled # 14 is_fork # 15 license # 16 primary_language # 17 minimum_zig_version ← subquery from releases (repos has no such column) # --------------------------------------------------------------------------- _REPO_COLUMNS = """ r.id, u.avatar_id, r.owner, r.platform, r.description, r.issues_count, r.default_branch_name, r.fork_count, r.stargazer_count, r.watchers_count, r.pushed_at, r.created_at, r.is_archived, r.is_disabled, r.is_fork, r.license, r.primary_language, ( SELECT minimum_zig_version FROM releases WHERE repo_id = r.id ORDER BY published_at DESC LIMIT 1 ) AS minimum_zig_version """ _REPO_JOIN_USERS = "LEFT JOIN users u ON r.owner = u.id" def get_default_branch_info(conn, repo_id: str, default_branch: str) -> Dict[str, Any]: """Get information about the default branch (build.zig.zon file). Uses the most recent release as a proxy for the default branch state. """ sql = """ SELECT id, version, published_at, minimum_zig_version, readme_url, is_prerelease FROM releases WHERE repo_id = ? ORDER BY published_at DESC LIMIT 1 """ cursor = conn.execute(sql, (repo_id,)) release = cursor.fetchone() if not release: return { "minimum_zig_version": "0.0.0", "dependencies": [], "branch_name": default_branch, } release_id, version, published_at, min_zig_ver, readme_url, is_prerelease = release dependencies = _fetch_dependencies(conn, release_id) return { "minimum_zig_version": min_zig_ver or "0.0.0", "dependencies": dependencies, "branch_name": default_branch, "latest_version": version, "readme_url": readme_url, } def get_version_info(conn, repo_id: str, version: str) -> Dict[str, Any]: """Get information about a specific version/release. Raises: HTTPException: If version not found. """ sql = """ SELECT id, version, published_at, minimum_zig_version, readme_url, is_prerelease FROM releases WHERE repo_id = ? AND version = ? """ cursor = conn.execute(sql, (repo_id, version)) release = cursor.fetchone() if not release: raise HTTPException( status_code=404, detail=f"Version '{version}' not found for repository '{repo_id}'", ) release_id, ver, published_at, min_zig_ver, readme_url, is_prerelease = release dependencies = _fetch_dependencies(conn, release_id) return { "version": ver, "published_at": str(published_at) if published_at else None, "minimum_zig_version": min_zig_ver or "0.0.0", "readme_url": readme_url, "is_prerelease": bool(is_prerelease), "dependencies": dependencies, } def _fetch_dependencies(conn, release_id: int) -> List[Dict[str, Any]]: """Return all dependencies for a given release id. Note: the schema column is `is_lazy`, not `lazy`. """ sql = """ SELECT name, url, hash, is_lazy, path FROM release_dependencies WHERE release_id = ? """ cursor = conn.execute(sql, (release_id,)) return [ { "name": row[0], "url": row[1], "hash": row[2], "lazy": bool(row[3]) if row[3] is not None else False, "path": row[4], } for row in cursor.fetchall() ] def row_to_repo_dict(row: tuple) -> Dict[str, Any]: """Convert a database row to a repository dictionary. Expected column order (see _REPO_COLUMNS): 0 id 1 avatar_id 2 owner 3 platform 4 description 5 issues_count 6 default_branch_name 7 fork_count 8 stargazer_count 9 watchers_count 10 pushed_at 11 created_at 12 is_archived 13 is_disabled 14 is_fork 15 license 16 primary_language 17 minimum_zig_version 18 dependents_count """ platform_raw = (row[3] or "").lower() provider = ( "gh" if "github" in platform_raw else ("cb" if "codeberg" in platform_raw else "gh") ) repo_id = row[0] repo_name = repo_id.split("/")[-1] if "/" in repo_id else repo_id return { "id": row[0], "owner_name": row[2], "provider": provider, "repo_name": repo_name, "avatar_url": row[1], "owner": row[2], "platform": row[3], "description": row[4], "issues_count": row[5], "default_branch_name": row[6], "fork_count": row[7], "stargazer_count": row[8], "watchers_count": row[9], "pushed_at": str(row[10]) if row[10] else None, "created_at": str(row[11]) if row[11] else None, "is_archived": bool(row[12]), "is_disabled": bool(row[13]), "is_fork": bool(row[14]), "license": row[15], "primary_language": row[16], "minimum_zig_version": row[17] if row[17] else "0.0.0", "dependents_count": row[18] if row[18] is not None else 0, } def get_type_filter(search_type: str) -> str: """Generate SQL WHERE clause fragment based on search type.""" if search_type == "package": return "AND pkg.repo_id IS NOT NULL" elif search_type == "program": return "AND prog.repo_id IS NOT NULL" else: # all return "AND (pkg.repo_id IS NOT NULL OR prog.repo_id IS NOT NULL)" def search_repos( conn, query: str, search_type: str = "all", per_page: int = 15, page: int = 1 ) -> Dict[str, Any]: """Search repositories by query string with pagination.""" actual_per_page = min(max(per_page, 1), 15) requested_page = max(page, 1) search_term_like = f"%{query}%" fts_query = " ".join( '"' + word.replace('"', '""') + '"*' for word in query.split() if word ) type_filter = get_type_filter(search_type) count_sql = f""" WITH matched_ids AS ( SELECT repo_id FROM repo_search WHERE keywords MATCH ?1 UNION SELECT id FROM repos WHERE id LIKE ?2 OR owner LIKE ?2 OR description LIKE ?2 OR primary_language LIKE ?2 ), filtered_repos AS ( SELECT DISTINCT r.id FROM repos r JOIN matched_ids mi ON r.id = mi.repo_id LEFT JOIN packages pkg ON r.id = pkg.repo_id LEFT JOIN programs prog ON r.id = prog.repo_id WHERE r.is_disabled = 0 {type_filter} ) SELECT COUNT(*) FROM filtered_repos """ total_count = conn.execute(count_sql, (fts_query, search_term_like)).fetchone()[0] or 0 total_pages = ( (total_count + actual_per_page - 1) // actual_per_page if total_count > 0 else 0 ) current_page = min(requested_page, total_pages) if total_pages > 0 else 1 offset = (current_page - 1) * actual_per_page sql = f""" WITH matched_ids AS ( SELECT repo_id FROM repo_search WHERE keywords MATCH ?1 UNION SELECT id FROM repos WHERE id LIKE ?2 OR owner LIKE ?2 OR description LIKE ?2 OR primary_language LIKE ?2 ), filtered_repos AS ( SELECT DISTINCT r.id FROM repos r JOIN matched_ids mi ON r.id = mi.repo_id LEFT JOIN packages pkg ON r.id = pkg.repo_id LEFT JOIN programs prog ON r.id = prog.repo_id WHERE r.is_disabled = 0 {type_filter} ), repo_data AS ( SELECT {_REPO_COLUMNS} FROM repos r {_REPO_JOIN_USERS} JOIN filtered_repos fr ON r.id = fr.id ) SELECT rd.*, (SELECT COUNT(*) FROM repo_dependents WHERE repo_id = rd.id) AS dependents_count FROM repo_data rd ORDER BY CASE WHEN rd.id = ?3 THEN 1 WHEN rd.id LIKE ?4 THEN 2 ELSE 3 END, rd.stargazer_count DESC LIMIT ?5 OFFSET ?6 """ starts_with = f"{query}%" cursor = conn.execute( sql, (fts_query, search_term_like, query, starts_with, actual_per_page, offset) ) rows = cursor.fetchall() items = [row_to_repo_dict(row) for row in rows] return { "items": items, "total": total_count, "page": current_page, "per_page": actual_per_page, "total_pages": total_pages, "has_previous_page": current_page > 1, "has_next_page": current_page < total_pages, } def get_latest_repos( conn, search_type: str = "all", limit: int = 10 ) -> List[Dict[str, Any]]: """Get latest repositories ordered by creation date.""" type_filter = get_type_filter(search_type) sql = f""" WITH repo_data AS ( SELECT {_REPO_COLUMNS} FROM repos r {_REPO_JOIN_USERS} LEFT JOIN packages pkg ON r.id = pkg.repo_id LEFT JOIN programs prog ON r.id = prog.repo_id WHERE r.is_disabled = 0 {type_filter} ORDER BY r.created_at DESC LIMIT ?1 ) SELECT rd.*, (SELECT COUNT(*) FROM repo_dependents WHERE repo_id = rd.id) AS dependents_count FROM repo_data rd """ cursor = conn.execute(sql, (limit,)) rows = cursor.fetchall() return [row_to_repo_dict(row) for row in rows] def get_scroll_repos( conn, search_type: str = "all", per_page: int = 20, page: int = 1 ) -> List[Dict[str, Any]]: """Get paginated repositories ordered by star count.""" actual_per_page = min(per_page, 20) offset = (max(page, 1) - 1) * actual_per_page type_filter = get_type_filter(search_type) sql = f""" WITH repo_data AS ( SELECT {_REPO_COLUMNS} FROM repos r {_REPO_JOIN_USERS} LEFT JOIN packages pkg ON r.id = pkg.repo_id LEFT JOIN programs prog ON r.id = prog.repo_id WHERE r.is_disabled = 0 {type_filter} ORDER BY r.stargazer_count DESC, r.id ASC LIMIT ?1 OFFSET ?2 ) SELECT rd.*, (SELECT COUNT(*) FROM repo_dependents WHERE repo_id = rd.id) AS dependents_count FROM repo_data rd """ cursor = conn.execute(sql, (actual_per_page, offset)) rows = cursor.fetchall() return [row_to_repo_dict(row) for row in rows] def get_section_repos(conn, section_name: str, limit: int = 50) -> List[Dict[str, Any]]: """Get repositories for a specific index section.""" sql = f""" WITH section_repos AS ( SELECT repo_id FROM index_sections WHERE section_name = ?1 ), repo_data AS ( SELECT {_REPO_COLUMNS} FROM repos r {_REPO_JOIN_USERS} JOIN section_repos sr ON r.id = sr.repo_id LEFT JOIN packages pkg ON r.id = pkg.repo_id LEFT JOIN programs prog ON r.id = prog.repo_id WHERE r.is_disabled = 0 AND (pkg.repo_id IS NOT NULL OR prog.repo_id IS NOT NULL) LIMIT ?2 ) SELECT rd.*, (SELECT COUNT(*) FROM repo_dependents WHERE repo_id = rd.id) AS dependents_count FROM repo_data rd """ cursor = conn.execute(sql, (section_name, limit)) rows = cursor.fetchall() return [row_to_repo_dict(row) for row in rows] def parse_repo_id(repo_id: str) -> tuple[str, str]: """Parse repo_id into owner and repo name. Raises: HTTPException: If repo_id format is invalid. """ parts = repo_id.split("/") if len(parts) == 3: return parts[1], parts[2] elif len(parts) == 2: return parts[0], parts[1] else: raise HTTPException( status_code=400, detail="Invalid repo_id format. Expected 'provider/owner/repo' or 'owner/repo'", ) def get_repo_details( conn, repo_id: str, version: Optional[str] = None ) -> Dict[str, Any]: """Get detailed information about a repository. - version=None → default branch info (latest release used as proxy) - version= → that specific release's info Raises: HTTPException: If repository or version not found. """ owner_name, repo_name = parse_repo_id(repo_id) # avatar_id is on users, not repos; minimum_zig_version comes from releases. repo_sql = f""" SELECT r.id, u.avatar_id, r.owner, r.platform, r.description, r.issues_count, r.default_branch_name, r.fork_count, r.stargazer_count, r.watchers_count, r.pushed_at, r.created_at, r.is_archived, r.is_disabled, r.is_fork, r.license, r.primary_language FROM repos r {_REPO_JOIN_USERS} WHERE r.id = ? """ cursor = conn.execute(repo_sql, (repo_id,)) repo_row = cursor.fetchone() if not repo_row: raise HTTPException(status_code=404, detail="Repository not found") ( r_id, avatar_id, owner, platform, desc, issues, default_branch, forks, stars, watchers, pushed_at, created_at, is_archived, is_disabled, is_fork, license_spdx, primary_language, ) = repo_row platform_raw = (platform or "").lower() provider_id = ( "gh" if "github" in platform_raw else ("cb" if "codeberg" in platform_raw else "gh") ) # All releases for this repo releases_sql = """ SELECT version FROM releases WHERE repo_id = ? ORDER BY published_at DESC """ cursor = conn.execute(releases_sql, (repo_id,)) releases_list = [row[0] for row in cursor.fetchall() if row[0]] # Dependents (version-agnostic) cursor = conn.execute( "SELECT dependent FROM repo_dependents WHERE repo_id = ?", (repo_id,) ) dependents = [row[0] for row in cursor.fetchall()] response = { "id": r_id, "avatar_id": avatar_id, "owner": owner, "platform": platform, "description": desc, "issues_count": issues, "default_branch_name": default_branch, "fork_count": forks, "stargazer_count": stars, "watchers_count": watchers, "pushed_at": str(pushed_at) if pushed_at else None, "created_at": str(created_at) if created_at else None, "is_archived": bool(is_archived), "is_disabled": bool(is_disabled), "is_fork": bool(is_fork), "license": license_spdx, "primary_language": primary_language, "provider_id": provider_id, "owner_name": owner_name, "repo_name": repo_name, "stars_count": stars, "forks_count": forks, "releases": releases_list, "dependents": dependents, } if version: version_info = get_version_info(conn, repo_id, version) response.update( { "version": version_info["version"], "published_at": version_info["published_at"], "minimum_zig_version": version_info["minimum_zig_version"], "readme_url": version_info["readme_url"], "is_prerelease": version_info["is_prerelease"], "dependencies": version_info["dependencies"], "requested_version": version, } ) else: branch_info = get_default_branch_info(conn, repo_id, default_branch) response.update( { "minimum_zig_version": branch_info["minimum_zig_version"], "dependencies": branch_info["dependencies"], "branch_name": branch_info["branch_name"], "latest_version": branch_info.get("latest_version"), "readme_url": branch_info.get("readme_url"), "version": None, # Indicates default branch, not a specific version } ) return response def get_user_details(conn, user_id: str) -> Dict[str, Any]: """Get detailed information about a user and their repositories.""" cursor = conn.execute( "SELECT id, avatar_id, platform, bio FROM users WHERE id = ?", (user_id,) ) user_row = cursor.fetchone() if not user_row: raise HTTPException(status_code=404, detail="User not found") user_data = { "id": user_row[0], "avatar_id": user_row[1], "platform": user_row[2], "bio": user_row[3], } # avatar_id must come from users; minimum_zig_version from releases subquery. repos_sql = f""" SELECT {_REPO_COLUMNS}, (SELECT COUNT(*) FROM repo_dependents WHERE repo_id = r.id) AS dependents_count, pkg.repo_id AS is_package, prog.repo_id AS is_program FROM repos r {_REPO_JOIN_USERS} LEFT JOIN packages pkg ON r.id = pkg.repo_id LEFT JOIN programs prog ON r.id = prog.repo_id WHERE r.owner = ? AND r.is_disabled = 0 ORDER BY r.stargazer_count DESC """ cursor = conn.execute(repos_sql, (user_id,)) rows = cursor.fetchall() packages = [] programs = [] for row in rows: # Columns 0-18: the standard repo columns + dependents_count repo_dict = row_to_repo_dict(row[:19]) is_package = row[19] is_program = row[20] if is_package: packages.append(repo_dict) if is_program: programs.append(repo_dict) user_data["packages"] = packages user_data["programs"] = programs return user_data def check_db_connection(): """Verify database connection is available.""" if not db_conn: raise HTTPException(status_code=503, detail="Database not initialized") # --------------------------------------------------------------------------- # API Endpoints # --------------------------------------------------------------------------- @app.get("/search/packages", tags=["Search"]) async def search_packages_endpoint( q: str = Query(..., min_length=1, description="Search query"), per_page: int = Query(15, ge=1, le=15, description="Items per page"), page: int = Query(1, ge=1, description="Page number"), ): """Search for Zig packages with pagination.""" check_db_connection() return search_repos(db_conn, q, search_type="package", per_page=per_page, page=page) @app.get("/search/programs", tags=["Search"]) async def search_programs_endpoint( q: str = Query(..., min_length=1, description="Search query"), per_page: int = Query(15, ge=1, le=15, description="Items per page"), page: int = Query(1, ge=1, description="Page number"), ): """Search for Zig programs with pagination.""" check_db_connection() return search_repos(db_conn, q, search_type="program", per_page=per_page, page=page) @app.get("/packageIndexDetails", tags=["Packages"]) async def get_package_index_details_endpoint(): """ All data required for the package index page. Includes top 10 latest packages, top 10 most-starred packages, and curated sections. """ check_db_connection() return { "latest": get_latest_repos(db_conn, search_type="package", limit=10), "most_used": get_scroll_repos(db_conn, search_type="package", per_page=10, page=1), "games": get_section_repos(db_conn, "games", limit=50), "gui": get_section_repos(db_conn, "gui", limit=50), "web": get_section_repos(db_conn, "web", limit=50), } @app.get("/programIndexDetails", tags=["Programs"]) async def get_program_index_details_endpoint(): """ All data required for the program index page. Includes top 10 latest programs and top 10 most-starred programs. """ check_db_connection() return { "latest": get_latest_repos(db_conn, search_type="program", limit=10), "most_used": get_scroll_repos(db_conn, search_type="program", per_page=10, page=1), } @app.get("/packages/scroll", tags=["Packages"]) async def scroll_packages_endpoint( per_page: int = Query(20, ge=1, le=20, description="Items per page"), page: int = Query(1, ge=1, description="Page number"), ): """Paginated list of packages sorted by stars.""" check_db_connection() return get_scroll_repos(db_conn, search_type="package", per_page=per_page, page=page) @app.get("/programs/scroll", tags=["Programs"]) async def scroll_programs_endpoint( per_page: int = Query(20, ge=1, le=20, description="Items per page"), page: int = Query(1, ge=1, description="Page number"), ): """Paginated list of programs sorted by stars.""" check_db_connection() return get_scroll_repos(db_conn, search_type="program", per_page=per_page, page=page) @app.get("/users/", tags=["Users"]) async def get_user_details_endpoint( q: str = Query(..., description="User ID (e.g., gh/username or cb/username)") ): """ Detailed information about a user, including their packages and programs. """ check_db_connection() return get_user_details(db_conn, q) @app.get("/packages", tags=["Packages"]) async def get_package_details_endpoint( q: str = Query( ..., alias="q", description="Repository ID (owner/repo or provider/owner/repo)" ), version: Optional[str] = Query( None, description="Specific version to fetch (omit for default branch)" ), ): """ Detailed information about a package. - Without version: returns default branch info (latest release used as proxy) - With version: returns that specific release's info Examples: - /packages?q=gh/rohanvashisht1234/zorsig - /packages?q=gh/rohanvashisht1234/zorsig&version=0.0.1 """ check_db_connection() return get_repo_details(db_conn, q, version) @app.get("/programs", tags=["Programs"]) async def get_program_details_endpoint( q: str = Query( ..., alias="q", description="Repository ID (owner/repo or provider/owner/repo)" ), version: Optional[str] = Query( None, description="Specific version to fetch (omit for default branch)" ), ): """ Detailed information about a program. - Without version: returns default branch info - With version: returns that specific release's info Examples: - /programs?q=gh/owner/program - /programs?q=gh/owner/program&version=1.0.0 """ check_db_connection() return get_repo_details(db_conn, q, version) @app.get("/health", tags=["System"]) async def health_check(): """Check API health status.""" return {"status": "healthy", "database": "connected" if db_conn else "disconnected"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)