zigistryBackend / main.py
RohanVashisht's picture
formatted files
7e44512
raw
history blame
22 kB
import libsql
import os
from dotenv import load_dotenv
load_dotenv()
from fastapi import FastAPI, Query, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from typing import Optional, List, Dict, Any
from contextlib import asynccontextmanager
from datetime import datetime
# Global database connection
db_conn = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifecycle - initialize DB on startup."""
global db_conn
url = os.getenv("DATABASE_URL")
auth_token = os.getenv("API_KEY")
if not url or not auth_token:
raise RuntimeError("DATABASE_URL and API_KEY environment variables must be set")
try:
db_conn = libsql.connect(
"zigistry-main.db", sync_url=url, auth_token=auth_token
)
db_conn.sync()
print("Database connection established successfully")
except Exception as e:
raise RuntimeError(f"Failed to connect to database: {e}")
yield
# Cleanup on shutdown
if db_conn:
try:
db_conn.close()
except Exception as e:
print(f"Error closing database connection: {e}")
app = FastAPI(
title="Zigistry API",
description="API for searching and browsing Zig packages and programs",
version="1.0.0",
lifespan=lifespan,
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def get_default_branch_info(conn, repo_id: str, default_branch: str) -> Dict[str, Any]:
"""Get information about the default branch (build.zig.zon file).
Args:
conn: Database connection
repo_id: Repository identifier
default_branch: Name of the default branch
Returns:
Dictionary with default branch information including dependencies and minimum zig version
"""
# For default branch, we'll get the latest release info as a proxy
# In a real implementation, you'd fetch build.zig.zon from the default branch
sql = """
SELECT
id,
version,
published_at,
minimum_zig_version,
readme_url,
is_prerelease
FROM releases
WHERE repo_id = ?
ORDER BY published_at DESC
LIMIT 1
"""
cursor = conn.execute(sql, (repo_id,))
release = cursor.fetchone()
if not release:
return {
"minimum_zig_version": "0.0.0",
"dependencies": [],
"branch_name": default_branch,
}
release_id, version, published_at, min_zig_ver, readme_url, is_prerelease = release
# Get dependencies for this release
deps_sql = """
SELECT name, url, hash, lazy, path
FROM release_dependencies
WHERE release_id = ?
"""
cursor = conn.execute(deps_sql, (release_id,))
dep_rows = cursor.fetchall()
dependencies = [
{
"name": row[0],
"url": row[1],
"hash": row[2],
"lazy": bool(row[3]) if row[3] else False,
"path": row[4],
}
for row in dep_rows
]
return {
"minimum_zig_version": min_zig_ver or "0.0.0",
"dependencies": dependencies,
"branch_name": default_branch,
"latest_version": version,
"readme_url": readme_url,
}
def get_version_info(conn, repo_id: str, version: str) -> Dict[str, Any]:
"""Get information about a specific version/release.
Args:
conn: Database connection
repo_id: Repository identifier
version: Version string
Returns:
Dictionary with version-specific information
Raises:
HTTPException: If version not found
"""
sql = """
SELECT
id,
version,
published_at,
minimum_zig_version,
readme_url,
is_prerelease
FROM releases
WHERE repo_id = ? AND version = ?
"""
cursor = conn.execute(sql, (repo_id, version))
release = cursor.fetchone()
if not release:
raise HTTPException(
status_code=404,
detail=f"Version '{version}' not found for repository '{repo_id}'",
)
release_id, ver, published_at, min_zig_ver, readme_url, is_prerelease = release
# Get dependencies for this version
deps_sql = """
SELECT name, url, hash, lazy, path
FROM release_dependencies
WHERE release_id = ?
"""
cursor = conn.execute(deps_sql, (release_id,))
dep_rows = cursor.fetchall()
dependencies = [
{
"name": row[0],
"url": row[1],
"hash": row[2],
"lazy": bool(row[3]) if row[3] else False,
"path": row[4],
}
for row in dep_rows
]
return {
"version": ver,
"published_at": str(published_at) if published_at else None,
"minimum_zig_version": min_zig_ver or "0.0.0",
"readme_url": readme_url,
"is_prerelease": bool(is_prerelease),
"dependencies": dependencies,
}
def row_to_repo_dict(row: tuple) -> Dict[str, Any]:
"""Convert database row to repository dictionary.
Handles the common transformation from SQL result to API response format.
"""
platform_raw = (row[3] or "").lower()
provider = (
"gh"
if "github" in platform_raw
else ("cb" if "codeberg" in platform_raw else "gh")
)
repo_id = row[0]
repo_name = repo_id.split("/")[-1] if "/" in repo_id else repo_id
return {
"id": row[0],
"owner_name": row[2],
"provider": provider,
"repo_name": repo_name,
"avatar_url": row[1],
"owner": row[2],
"platform": row[3],
"description": row[4],
"issues_count": row[5],
"default_branch_name": row[6],
"fork_count": row[7],
"stargazer_count": row[8],
"watchers_count": row[9],
"pushed_at": str(row[10]) if row[10] else None,
"created_at": str(row[11]) if row[11] else None,
"is_archived": bool(row[12]),
"is_disabled": bool(row[13]),
"is_fork": bool(row[14]),
"license": row[15],
"primary_language": row[16],
"minimum_zig_version": row[17] if row[17] else "0.0.0",
"dependents_count": row[18] if row[18] is not None else 0,
}
def get_type_filter(search_type: str) -> str:
"""Generate SQL WHERE clause filter based on search type."""
if search_type == "package":
return "AND pkg.repo_id IS NOT NULL"
elif search_type == "program":
return "AND prog.repo_id IS NOT NULL"
else: # all
return "AND (pkg.repo_id IS NOT NULL OR prog.repo_id IS NOT NULL)"
def search_repos(
conn, query: str, search_type: str = "all", limit: int = 50
) -> List[Dict[str, Any]]:
"""Search repositories by query string.
Args:
conn: Database connection
query: Search query string
search_type: Filter by 'package', 'program', or 'all'
limit: Maximum number of results to return
Returns:
List of repository dictionaries with default branch minimum_zig_version
"""
search_term_like = f"%{query}%"
# FTS5 query with prefix matching for each word
fts_query = " ".join(
'"' + word.replace('"', '""') + '"*' for word in query.split() if word
)
type_filter = get_type_filter(search_type)
sql = f"""
WITH matched_ids AS (
SELECT repo_id FROM repo_search WHERE keywords MATCH ?1
UNION
SELECT id FROM repos WHERE id LIKE ?2 OR owner LIKE ?2 OR description LIKE ?2 OR primary_language LIKE ?2
),
repo_data AS (
SELECT
r.id,
r.avatar_id,
r.owner,
r.platform,
r.description,
r.issues_count,
r.default_branch_name,
r.fork_count,
r.stargazer_count,
r.watchers_count,
r.pushed_at,
r.created_at,
r.is_archived,
r.is_disabled,
r.is_fork,
r.license,
r.primary_language,
r.minimum_zig_version
FROM repos r
JOIN matched_ids mi ON r.id = mi.repo_id
LEFT JOIN packages pkg ON r.id = pkg.repo_id
LEFT JOIN programs prog ON r.id = prog.repo_id
WHERE
r.is_disabled = 0
{type_filter}
)
SELECT
rd.*,
(SELECT COUNT(*) FROM repo_dependents WHERE repo_id = rd.id) as dependents_count
FROM repo_data rd
ORDER BY
CASE
WHEN rd.id = ?3 THEN 1
WHEN rd.id LIKE ?4 THEN 2
ELSE 3
END,
rd.stargazer_count DESC
LIMIT ?5
"""
starts_with = f"{query}%"
# Parameters matches ?1 (FTS), ?2 (LIKE), ?3 (Exact), ?4 (Starts with), ?5 (Limit)
cursor = conn.execute(sql, (fts_query, search_term_like, query, starts_with, limit))
rows = cursor.fetchall()
return [row_to_repo_dict(row) for row in rows]
def get_latest_repos(
conn, search_type: str = "all", limit: int = 10
) -> List[Dict[str, Any]]:
"""Get latest repositories ordered by creation date.
Args:
conn: Database connection
search_type: Filter by 'package', 'program', or 'all'
limit: Maximum number of results to return
Returns:
List of repository dictionaries with default branch minimum_zig_version
"""
type_filter = get_type_filter(search_type)
sql = f"""
WITH repo_data AS (
SELECT
r.id,
r.avatar_id,
r.owner,
r.platform,
r.description,
r.issues_count,
r.default_branch_name,
r.fork_count,
r.stargazer_count,
r.watchers_count,
r.pushed_at,
r.created_at,
r.is_archived,
r.is_disabled,
r.is_fork,
r.license,
r.primary_language,
r.minimum_zig_version
FROM repos r
LEFT JOIN packages pkg ON r.id = pkg.repo_id
LEFT JOIN programs prog ON r.id = prog.repo_id
WHERE
r.is_disabled = 0
{type_filter}
ORDER BY r.created_at DESC
LIMIT ?1
)
SELECT
rd.*,
(SELECT COUNT(*) FROM repo_dependents WHERE repo_id = rd.id) as dependents_count
FROM repo_data rd
"""
cursor = conn.execute(sql, (limit,))
rows = cursor.fetchall()
return [row_to_repo_dict(row) for row in rows]
def get_scroll_repos(
conn, search_type: str = "all", per_page: int = 20, page: int = 1
) -> List[Dict[str, Any]]:
"""Get paginated repositories ordered by star count.
Args:
conn: Database connection
search_type: Filter by 'package', 'program', or 'all'
per_page: Number of items per page (max 20)
page: Page number (1-indexed)
Returns:
List of repository dictionaries with default branch minimum_zig_version
"""
actual_per_page = min(per_page, 20)
offset = (max(page, 1) - 1) * actual_per_page
type_filter = get_type_filter(search_type)
sql = f"""
WITH repo_data AS (
SELECT
r.id,
r.avatar_id,
r.owner,
r.platform,
r.description,
r.issues_count,
r.default_branch_name,
r.fork_count,
r.stargazer_count,
r.watchers_count,
r.pushed_at,
r.created_at,
r.is_archived,
r.is_disabled,
r.is_fork,
r.license,
r.primary_language,
r.minimum_zig_version
FROM repos r
LEFT JOIN packages pkg ON r.id = pkg.repo_id
LEFT JOIN programs prog ON r.id = prog.repo_id
WHERE
r.is_disabled = 0
{type_filter}
ORDER BY r.stargazer_count DESC, r.id ASC
LIMIT ?1 OFFSET ?2
)
SELECT
rd.*,
(SELECT COUNT(*) FROM repo_dependents WHERE repo_id = rd.id) as dependents_count
FROM repo_data rd
"""
cursor = conn.execute(sql, (actual_per_page, offset))
rows = cursor.fetchall()
return [row_to_repo_dict(row) for row in rows]
def parse_repo_id(repo_id: str) -> tuple[str, str]:
"""Parse repo_id into owner and repo name.
Args:
repo_id: Repository ID in format 'owner/repo' or 'provider/owner/repo'
Returns:
Tuple of (owner_name, repo_name)
Raises:
HTTPException: If repo_id format is invalid
"""
parts = repo_id.split("/")
if len(parts) == 3:
return parts[1], parts[2]
elif len(parts) == 2:
return parts[0], parts[1]
else:
raise HTTPException(
status_code=400,
detail="Invalid repo_id format. Expected 'provider/owner/repo' or 'owner/repo'",
)
def get_repo_details(
conn, repo_id: str, version: Optional[str] = None
) -> Dict[str, Any]:
"""Get detailed information about a repository.
Args:
conn: Database connection
repo_id: Repository identifier
version: Optional specific version to fetch details for
Returns:
Dictionary with complete repository details including:
- If version is None: default branch information with dependencies
- If version is specified: that specific version's information with dependencies
Raises:
HTTPException: If repository or version not found
"""
owner_name, repo_name = parse_repo_id(repo_id)
# Get repository details
repo_sql = """
SELECT
id, avatar_id, owner, platform, description, issues_count,
default_branch_name, fork_count, stargazer_count, watchers_count,
pushed_at, created_at, is_archived, is_disabled, is_fork,
license, primary_language, minimum_zig_version
FROM repos WHERE id = ?
"""
cursor = conn.execute(repo_sql, (repo_id,))
repo_row = cursor.fetchone()
if not repo_row:
raise HTTPException(status_code=404, detail="Repository not found")
(
r_id,
avatar_id,
owner,
platform,
desc,
issues,
default_branch,
forks,
stars,
watchers,
pushed_at,
created_at,
is_archived,
is_disabled,
is_fork,
license_spdx,
primary_language,
min_zig_ver,
) = repo_row
# Map platform to provider
platform_raw = (platform or "").lower()
provider_id = (
"gh"
if "github" in platform_raw
else ("cb" if "codeberg" in platform_raw else "gh")
)
# Get all releases for the releases list
releases_sql = """
SELECT version
FROM releases
WHERE repo_id = ?
ORDER BY published_at DESC
"""
cursor = conn.execute(releases_sql, (repo_id,))
releases_list = [row[0] for row in cursor.fetchall() if row[0]]
# Get dependents (same for all versions)
dependents_sql = "SELECT dependent FROM repo_dependents WHERE repo_id = ?"
cursor = conn.execute(dependents_sql, (repo_id,))
dependents = [row[0] for row in cursor.fetchall()]
# Build response with either version-specific or default branch info
response = {
"id": r_id,
"avatar_id": avatar_id,
"owner": owner,
"platform": platform,
"description": desc,
"issues_count": issues,
"default_branch_name": default_branch,
"fork_count": forks,
"stargazer_count": stars,
"watchers_count": watchers,
"pushed_at": str(pushed_at) if pushed_at else None,
"created_at": str(created_at) if created_at else None,
"is_archived": bool(is_archived),
"is_disabled": bool(is_disabled),
"is_fork": bool(is_fork),
"license": license_spdx,
"primary_language": primary_language,
"provider_id": provider_id,
"owner_name": owner_name,
"repo_name": repo_name,
"stars_count": stars,
"forks_count": forks,
"releases": releases_list,
"dependents": dependents,
}
if version:
# Get specific version info
version_info = get_version_info(conn, repo_id, version)
response.update(
{
"version": version_info["version"],
"published_at": version_info["published_at"],
"minimum_zig_version": version_info["minimum_zig_version"],
"readme_url": version_info["readme_url"],
"is_prerelease": version_info["is_prerelease"],
"dependencies": version_info["dependencies"],
"requested_version": version,
}
)
else:
# Get default branch info
branch_info = get_default_branch_info(conn, repo_id, default_branch)
response.update(
{
"minimum_zig_version": min_zig_ver
or branch_info["minimum_zig_version"],
"dependencies": branch_info["dependencies"],
"branch_name": branch_info["branch_name"],
"latest_version": branch_info.get("latest_version"),
"readme_url": branch_info.get("readme_url"),
"version": None, # Indicates default branch, not a specific version
}
)
return response
def check_db_connection():
"""Verify database connection is available."""
if not db_conn:
raise HTTPException(status_code=503, detail="Database not initialized")
# API Endpoints
@app.get("/search/packages", tags=["Search"])
async def search_packages_endpoint(
q: str = Query(..., min_length=1, description="Search query"),
limit: int = Query(50, ge=1, le=100, description="Maximum results"),
):
"""Search for Zig packages. Returns default branch minimum_zig_version for each result."""
check_db_connection()
return search_repos(db_conn, q, search_type="package", limit=limit)
@app.get("/search/programs", tags=["Search"])
async def search_programs_endpoint(
q: str = Query(..., min_length=1, description="Search query"),
limit: int = Query(50, ge=1, le=100, description="Maximum results"),
):
"""Search for Zig programs. Returns default branch minimum_zig_version for each result."""
check_db_connection()
return search_repos(db_conn, q, search_type="program", limit=limit)
@app.get("/packages/latest", tags=["Packages"])
async def get_latest_packages_endpoint(
limit: int = Query(10, ge=1, le=50, description="Number of packages"),
):
"""Get latest Zig packages. Returns default branch minimum_zig_version for each result."""
check_db_connection()
return get_latest_repos(db_conn, search_type="package", limit=limit)
@app.get("/programs/latest", tags=["Programs"])
async def get_latest_programs_endpoint(
limit: int = Query(10, ge=1, le=50, description="Number of programs"),
):
"""Get latest Zig programs. Returns default branch minimum_zig_version for each result."""
check_db_connection()
return get_latest_repos(db_conn, search_type="program", limit=limit)
@app.get("/packages/scroll", tags=["Packages"])
async def scroll_packages_endpoint(
per_page: int = Query(20, ge=1, le=20, description="Items per page"),
page: int = Query(1, ge=1, description="Page number"),
):
"""Get paginated list of packages sorted by stars. Returns default branch minimum_zig_version for each result."""
check_db_connection()
return get_scroll_repos(
db_conn, search_type="package", per_page=per_page, page=page
)
@app.get("/programs/scroll", tags=["Programs"])
async def scroll_programs_endpoint(
per_page: int = Query(20, ge=1, le=20, description="Items per page"),
page: int = Query(1, ge=1, description="Page number"),
):
"""Get paginated list of programs sorted by stars. Returns default branch minimum_zig_version for each result."""
check_db_connection()
return get_scroll_repos(
db_conn, search_type="program", per_page=per_page, page=page
)
@app.get("/packages", tags=["Packages"])
async def get_package_details_endpoint(
q: str = Query(
..., alias="q", description="Repository ID (owner/repo or provider/owner/repo)"
),
version: Optional[str] = Query(
None, description="Specific version to fetch (omit for default branch)"
),
):
"""
Get detailed information about a package.
- Without version parameter: Returns default branch information with dependencies
- With version parameter: Returns that specific version's information with dependencies
Examples:
- /packages?q=gh/rohanvashisht1234/zorsig (default branch)
- /packages?q=gh/rohanvashisht1234/zorsig&version=0.0.1 (specific version)
"""
check_db_connection()
return get_repo_details(db_conn, q, version)
@app.get("/programs", tags=["Programs"])
async def get_program_details_endpoint(
q: str = Query(
..., alias="q", description="Repository ID (owner/repo or provider/owner/repo)"
),
version: Optional[str] = Query(
None, description="Specific version to fetch (omit for default branch)"
),
):
"""
Get detailed information about a program.
- Without version parameter: Returns default branch information with dependencies
- With version parameter: Returns that specific version's information with dependencies
Examples:
- /programs?q=gh/owner/program (default branch)
- /programs?q=gh/owner/program&version=1.0.0 (specific version)
"""
check_db_connection()
return get_repo_details(db_conn, q, version)
@app.get("/health", tags=["System"])
async def health_check():
"""Check API health status."""
return {"status": "healthy", "database": "connected" if db_conn else "disconnected"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)