zigistryBackend / main.py
RohanVashisht's picture
Update main.py
794484e verified
import libsql
import os
import asyncio
from dotenv import load_dotenv
load_dotenv()
from fastapi import FastAPI, Query, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from typing import Optional, List, Dict, Any
from contextlib import asynccontextmanager
from datetime import datetime
# Global database connection
db_conn = None
async def sync_periodically(interval_seconds: int = 3600):
"""Sync the database every `interval_seconds`."""
while True:
await asyncio.sleep(interval_seconds)
try:
db_conn.sync()
print(f"Database synced at {datetime.now()}")
except Exception as e:
print(f"Sync error: {e}")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifecycle - initialize DB on startup."""
global db_conn
url = os.getenv("DATABASE_URL")
auth_token = os.getenv("API_KEY")
if not url or not auth_token:
raise RuntimeError("DATABASE_URL and API_KEY environment variables must be set")
try:
db_conn = libsql.connect(
"zigistry-main.db", sync_url=url, auth_token=auth_token
)
db_conn.sync()
print("Database connection established successfully")
except Exception as e:
raise RuntimeError(f"Failed to connect to database: {e}")
sync_task = asyncio.create_task(sync_periodically(3600))
yield
sync_task.cancel()
try:
await sync_task
except asyncio.CancelledError:
pass
# Cleanup on shutdown
if db_conn:
try:
db_conn.close()
except Exception as e:
print(f"Error closing database connection: {e}")
app = FastAPI(
title="Zigistry API",
description="API for searching and browsing Zig packages and programs",
version="1.0.0",
lifespan=lifespan,
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ---------------------------------------------------------------------------
# SQL fragment reused across all list/search queries.
# Selects every column row_to_repo_dict() expects, in the correct order:
# 0 id
# 1 avatar_id ← joined from users (repos has no avatar_id column)
# 2 owner
# 3 platform
# 4 description
# 5 issues_count
# 6 default_branch_name
# 7 fork_count
# 8 stargazer_count
# 9 watchers_count
# 10 pushed_at
# 11 created_at
# 12 is_archived
# 13 is_disabled
# 14 is_fork
# 15 license
# 16 primary_language
# 17 minimum_zig_version ← subquery from releases (repos has no such column)
# ---------------------------------------------------------------------------
_REPO_COLUMNS = """
r.id,
u.avatar_id,
r.owner,
r.platform,
r.description,
r.issues_count,
r.default_branch_name,
r.fork_count,
r.stargazer_count,
r.watchers_count,
r.pushed_at,
r.created_at,
r.is_archived,
r.is_disabled,
r.is_fork,
r.license,
r.primary_language,
(
SELECT minimum_zig_version
FROM releases
WHERE repo_id = r.id
ORDER BY published_at DESC
LIMIT 1
) AS minimum_zig_version
"""
_REPO_JOIN_USERS = "LEFT JOIN users u ON r.owner = u.id"
def get_default_branch_info(conn, repo_id: str, default_branch: str) -> Dict[str, Any]:
"""Get information about the default branch (build.zig.zon file).
Uses the most recent release as a proxy for the default branch state.
"""
sql = """
SELECT
id,
version,
published_at,
minimum_zig_version,
readme_url,
is_prerelease
FROM releases
WHERE repo_id = ?
ORDER BY published_at DESC
LIMIT 1
"""
cursor = conn.execute(sql, (repo_id,))
release = cursor.fetchone()
if not release:
return {
"minimum_zig_version": "0.0.0",
"dependencies": [],
"branch_name": default_branch,
}
release_id, version, published_at, min_zig_ver, readme_url, is_prerelease = release
dependencies = _fetch_dependencies(conn, release_id)
return {
"minimum_zig_version": min_zig_ver or "0.0.0",
"dependencies": dependencies,
"branch_name": default_branch,
"latest_version": version,
"readme_url": readme_url,
}
def get_version_info(conn, repo_id: str, version: str) -> Dict[str, Any]:
"""Get information about a specific version/release.
Raises:
HTTPException: If version not found.
"""
sql = """
SELECT
id,
version,
published_at,
minimum_zig_version,
readme_url,
is_prerelease
FROM releases
WHERE repo_id = ? AND version = ?
"""
cursor = conn.execute(sql, (repo_id, version))
release = cursor.fetchone()
if not release:
raise HTTPException(
status_code=404,
detail=f"Version '{version}' not found for repository '{repo_id}'",
)
release_id, ver, published_at, min_zig_ver, readme_url, is_prerelease = release
dependencies = _fetch_dependencies(conn, release_id)
return {
"version": ver,
"published_at": str(published_at) if published_at else None,
"minimum_zig_version": min_zig_ver or "0.0.0",
"readme_url": readme_url,
"is_prerelease": bool(is_prerelease),
"dependencies": dependencies,
}
def _fetch_dependencies(conn, release_id: int) -> List[Dict[str, Any]]:
"""Return all dependencies for a given release id.
Note: the schema column is `is_lazy`, not `lazy`.
"""
sql = """
SELECT name, url, hash, is_lazy, path
FROM release_dependencies
WHERE release_id = ?
"""
cursor = conn.execute(sql, (release_id,))
return [
{
"name": row[0],
"url": row[1],
"hash": row[2],
"lazy": bool(row[3]) if row[3] is not None else False,
"path": row[4],
}
for row in cursor.fetchall()
]
def row_to_repo_dict(row: tuple) -> Dict[str, Any]:
"""Convert a database row to a repository dictionary.
Expected column order (see _REPO_COLUMNS):
0 id
1 avatar_id
2 owner
3 platform
4 description
5 issues_count
6 default_branch_name
7 fork_count
8 stargazer_count
9 watchers_count
10 pushed_at
11 created_at
12 is_archived
13 is_disabled
14 is_fork
15 license
16 primary_language
17 minimum_zig_version
18 dependents_count
"""
platform_raw = (row[3] or "").lower()
provider = (
"gh"
if "github" in platform_raw
else ("cb" if "codeberg" in platform_raw else "gh")
)
repo_id = row[0]
repo_name = repo_id.split("/")[-1] if "/" in repo_id else repo_id
return {
"id": row[0],
"owner_name": row[2],
"provider": provider,
"repo_name": repo_name,
"avatar_url": row[1],
"owner": row[2],
"platform": row[3],
"description": row[4],
"issues_count": row[5],
"default_branch_name": row[6],
"fork_count": row[7],
"stargazer_count": row[8],
"watchers_count": row[9],
"pushed_at": str(row[10]) if row[10] else None,
"created_at": str(row[11]) if row[11] else None,
"is_archived": bool(row[12]),
"is_disabled": bool(row[13]),
"is_fork": bool(row[14]),
"license": row[15],
"primary_language": row[16],
"minimum_zig_version": row[17] if row[17] else "0.0.0",
"dependents_count": row[18] if row[18] is not None else 0,
}
def get_type_filter(search_type: str) -> str:
"""Generate SQL WHERE clause fragment based on search type."""
if search_type == "package":
return "AND pkg.repo_id IS NOT NULL"
elif search_type == "program":
return "AND prog.repo_id IS NOT NULL"
else: # all
return "AND (pkg.repo_id IS NOT NULL OR prog.repo_id IS NOT NULL)"
def search_repos(
conn, query: str, search_type: str = "all", per_page: int = 15, page: int = 1
) -> Dict[str, Any]:
"""Search repositories by query string with pagination."""
actual_per_page = min(max(per_page, 1), 15)
requested_page = max(page, 1)
search_term_like = f"%{query}%"
fts_query = " ".join(
'"' + word.replace('"', '""') + '"*' for word in query.split() if word
)
type_filter = get_type_filter(search_type)
count_sql = f"""
WITH matched_ids AS (
SELECT repo_id FROM repo_search WHERE keywords MATCH ?1
UNION
SELECT id FROM repos
WHERE id LIKE ?2 OR owner LIKE ?2 OR description LIKE ?2 OR primary_language LIKE ?2
),
filtered_repos AS (
SELECT DISTINCT r.id
FROM repos r
JOIN matched_ids mi ON r.id = mi.repo_id
LEFT JOIN packages pkg ON r.id = pkg.repo_id
LEFT JOIN programs prog ON r.id = prog.repo_id
WHERE
r.is_disabled = 0
{type_filter}
)
SELECT COUNT(*) FROM filtered_repos
"""
total_count = conn.execute(count_sql, (fts_query, search_term_like)).fetchone()[0] or 0
total_pages = (
(total_count + actual_per_page - 1) // actual_per_page if total_count > 0 else 0
)
current_page = min(requested_page, total_pages) if total_pages > 0 else 1
offset = (current_page - 1) * actual_per_page
sql = f"""
WITH matched_ids AS (
SELECT repo_id FROM repo_search WHERE keywords MATCH ?1
UNION
SELECT id FROM repos
WHERE id LIKE ?2 OR owner LIKE ?2 OR description LIKE ?2 OR primary_language LIKE ?2
),
filtered_repos AS (
SELECT DISTINCT r.id
FROM repos r
JOIN matched_ids mi ON r.id = mi.repo_id
LEFT JOIN packages pkg ON r.id = pkg.repo_id
LEFT JOIN programs prog ON r.id = prog.repo_id
WHERE
r.is_disabled = 0
{type_filter}
),
repo_data AS (
SELECT
{_REPO_COLUMNS}
FROM repos r
{_REPO_JOIN_USERS}
JOIN filtered_repos fr ON r.id = fr.id
)
SELECT
rd.*,
(SELECT COUNT(*) FROM repo_dependents WHERE repo_id = rd.id) AS dependents_count
FROM repo_data rd
ORDER BY
CASE
WHEN rd.id = ?3 THEN 1
WHEN rd.id LIKE ?4 THEN 2
ELSE 3
END,
rd.stargazer_count DESC
LIMIT ?5 OFFSET ?6
"""
starts_with = f"{query}%"
cursor = conn.execute(
sql, (fts_query, search_term_like, query, starts_with, actual_per_page, offset)
)
rows = cursor.fetchall()
items = [row_to_repo_dict(row) for row in rows]
return {
"items": items,
"total": total_count,
"page": current_page,
"per_page": actual_per_page,
"total_pages": total_pages,
"has_previous_page": current_page > 1,
"has_next_page": current_page < total_pages,
}
def get_latest_repos(
conn, search_type: str = "all", limit: int = 10
) -> List[Dict[str, Any]]:
"""Get latest repositories ordered by creation date."""
type_filter = get_type_filter(search_type)
sql = f"""
WITH repo_data AS (
SELECT
{_REPO_COLUMNS}
FROM repos r
{_REPO_JOIN_USERS}
LEFT JOIN packages pkg ON r.id = pkg.repo_id
LEFT JOIN programs prog ON r.id = prog.repo_id
WHERE
r.is_disabled = 0
{type_filter}
ORDER BY r.created_at DESC
LIMIT ?1
)
SELECT
rd.*,
(SELECT COUNT(*) FROM repo_dependents WHERE repo_id = rd.id) AS dependents_count
FROM repo_data rd
"""
cursor = conn.execute(sql, (limit,))
rows = cursor.fetchall()
return [row_to_repo_dict(row) for row in rows]
def get_scroll_repos(
conn, search_type: str = "all", per_page: int = 20, page: int = 1
) -> List[Dict[str, Any]]:
"""Get paginated repositories ordered by star count."""
actual_per_page = min(per_page, 20)
offset = (max(page, 1) - 1) * actual_per_page
type_filter = get_type_filter(search_type)
sql = f"""
WITH repo_data AS (
SELECT
{_REPO_COLUMNS}
FROM repos r
{_REPO_JOIN_USERS}
LEFT JOIN packages pkg ON r.id = pkg.repo_id
LEFT JOIN programs prog ON r.id = prog.repo_id
WHERE
r.is_disabled = 0
{type_filter}
ORDER BY r.stargazer_count DESC, r.id ASC
LIMIT ?1 OFFSET ?2
)
SELECT
rd.*,
(SELECT COUNT(*) FROM repo_dependents WHERE repo_id = rd.id) AS dependents_count
FROM repo_data rd
"""
cursor = conn.execute(sql, (actual_per_page, offset))
rows = cursor.fetchall()
return [row_to_repo_dict(row) for row in rows]
def get_section_repos(conn, section_name: str, limit: int = 50) -> List[Dict[str, Any]]:
"""Get repositories for a specific index section."""
sql = f"""
WITH section_repos AS (
SELECT repo_id FROM index_sections WHERE section_name = ?1
),
repo_data AS (
SELECT
{_REPO_COLUMNS}
FROM repos r
{_REPO_JOIN_USERS}
JOIN section_repos sr ON r.id = sr.repo_id
LEFT JOIN packages pkg ON r.id = pkg.repo_id
LEFT JOIN programs prog ON r.id = prog.repo_id
WHERE
r.is_disabled = 0
AND (pkg.repo_id IS NOT NULL OR prog.repo_id IS NOT NULL)
LIMIT ?2
)
SELECT
rd.*,
(SELECT COUNT(*) FROM repo_dependents WHERE repo_id = rd.id) AS dependents_count
FROM repo_data rd
"""
cursor = conn.execute(sql, (section_name, limit))
rows = cursor.fetchall()
return [row_to_repo_dict(row) for row in rows]
def parse_repo_id(repo_id: str) -> tuple[str, str]:
"""Parse repo_id into owner and repo name.
Raises:
HTTPException: If repo_id format is invalid.
"""
parts = repo_id.split("/")
if len(parts) == 3:
return parts[1], parts[2]
elif len(parts) == 2:
return parts[0], parts[1]
else:
raise HTTPException(
status_code=400,
detail="Invalid repo_id format. Expected 'provider/owner/repo' or 'owner/repo'",
)
def get_repo_details(
conn, repo_id: str, version: Optional[str] = None
) -> Dict[str, Any]:
"""Get detailed information about a repository.
- version=None → default branch info (latest release used as proxy)
- version=<tag> → that specific release's info
Raises:
HTTPException: If repository or version not found.
"""
owner_name, repo_name = parse_repo_id(repo_id)
# avatar_id is on users, not repos; minimum_zig_version comes from releases.
repo_sql = f"""
SELECT
r.id,
u.avatar_id,
r.owner,
r.platform,
r.description,
r.issues_count,
r.default_branch_name,
r.fork_count,
r.stargazer_count,
r.watchers_count,
r.pushed_at,
r.created_at,
r.is_archived,
r.is_disabled,
r.is_fork,
r.license,
r.primary_language
FROM repos r
{_REPO_JOIN_USERS}
WHERE r.id = ?
"""
cursor = conn.execute(repo_sql, (repo_id,))
repo_row = cursor.fetchone()
if not repo_row:
raise HTTPException(status_code=404, detail="Repository not found")
(
r_id,
avatar_id,
owner,
platform,
desc,
issues,
default_branch,
forks,
stars,
watchers,
pushed_at,
created_at,
is_archived,
is_disabled,
is_fork,
license_spdx,
primary_language,
) = repo_row
platform_raw = (platform or "").lower()
provider_id = (
"gh"
if "github" in platform_raw
else ("cb" if "codeberg" in platform_raw else "gh")
)
# All releases for this repo
releases_sql = """
SELECT version
FROM releases
WHERE repo_id = ?
ORDER BY published_at DESC
"""
cursor = conn.execute(releases_sql, (repo_id,))
releases_list = [row[0] for row in cursor.fetchall() if row[0]]
# Dependents (version-agnostic)
cursor = conn.execute(
"SELECT dependent FROM repo_dependents WHERE repo_id = ?", (repo_id,)
)
dependents = [row[0] for row in cursor.fetchall()]
response = {
"id": r_id,
"avatar_id": avatar_id,
"owner": owner,
"platform": platform,
"description": desc,
"issues_count": issues,
"default_branch_name": default_branch,
"fork_count": forks,
"stargazer_count": stars,
"watchers_count": watchers,
"pushed_at": str(pushed_at) if pushed_at else None,
"created_at": str(created_at) if created_at else None,
"is_archived": bool(is_archived),
"is_disabled": bool(is_disabled),
"is_fork": bool(is_fork),
"license": license_spdx,
"primary_language": primary_language,
"provider_id": provider_id,
"owner_name": owner_name,
"repo_name": repo_name,
"stars_count": stars,
"forks_count": forks,
"releases": releases_list,
"dependents": dependents,
}
if version:
version_info = get_version_info(conn, repo_id, version)
response.update(
{
"version": version_info["version"],
"published_at": version_info["published_at"],
"minimum_zig_version": version_info["minimum_zig_version"],
"readme_url": version_info["readme_url"],
"is_prerelease": version_info["is_prerelease"],
"dependencies": version_info["dependencies"],
"requested_version": version,
}
)
else:
branch_info = get_default_branch_info(conn, repo_id, default_branch)
response.update(
{
"minimum_zig_version": branch_info["minimum_zig_version"],
"dependencies": branch_info["dependencies"],
"branch_name": branch_info["branch_name"],
"latest_version": branch_info.get("latest_version"),
"readme_url": branch_info.get("readme_url"),
"version": None, # Indicates default branch, not a specific version
}
)
return response
def get_user_details(conn, user_id: str) -> Dict[str, Any]:
"""Get detailed information about a user and their repositories."""
cursor = conn.execute(
"SELECT id, avatar_id, platform, bio FROM users WHERE id = ?", (user_id,)
)
user_row = cursor.fetchone()
if not user_row:
raise HTTPException(status_code=404, detail="User not found")
user_data = {
"id": user_row[0],
"avatar_id": user_row[1],
"platform": user_row[2],
"bio": user_row[3],
}
# avatar_id must come from users; minimum_zig_version from releases subquery.
repos_sql = f"""
SELECT
{_REPO_COLUMNS},
(SELECT COUNT(*) FROM repo_dependents WHERE repo_id = r.id) AS dependents_count,
pkg.repo_id AS is_package,
prog.repo_id AS is_program
FROM repos r
{_REPO_JOIN_USERS}
LEFT JOIN packages pkg ON r.id = pkg.repo_id
LEFT JOIN programs prog ON r.id = prog.repo_id
WHERE r.owner = ? AND r.is_disabled = 0
ORDER BY r.stargazer_count DESC
"""
cursor = conn.execute(repos_sql, (user_id,))
rows = cursor.fetchall()
packages = []
programs = []
for row in rows:
# Columns 0-18: the standard repo columns + dependents_count
repo_dict = row_to_repo_dict(row[:19])
is_package = row[19]
is_program = row[20]
if is_package:
packages.append(repo_dict)
if is_program:
programs.append(repo_dict)
user_data["packages"] = packages
user_data["programs"] = programs
return user_data
def check_db_connection():
"""Verify database connection is available."""
if not db_conn:
raise HTTPException(status_code=503, detail="Database not initialized")
# ---------------------------------------------------------------------------
# API Endpoints
# ---------------------------------------------------------------------------
@app.get("/search/packages", tags=["Search"])
async def search_packages_endpoint(
q: str = Query(..., min_length=1, description="Search query"),
per_page: int = Query(15, ge=1, le=15, description="Items per page"),
page: int = Query(1, ge=1, description="Page number"),
):
"""Search for Zig packages with pagination."""
check_db_connection()
return search_repos(db_conn, q, search_type="package", per_page=per_page, page=page)
@app.get("/search/programs", tags=["Search"])
async def search_programs_endpoint(
q: str = Query(..., min_length=1, description="Search query"),
per_page: int = Query(15, ge=1, le=15, description="Items per page"),
page: int = Query(1, ge=1, description="Page number"),
):
"""Search for Zig programs with pagination."""
check_db_connection()
return search_repos(db_conn, q, search_type="program", per_page=per_page, page=page)
@app.get("/packageIndexDetails", tags=["Packages"])
async def get_package_index_details_endpoint():
"""
All data required for the package index page.
Includes top 10 latest packages, top 10 most-starred packages, and curated sections.
"""
check_db_connection()
return {
"latest": get_latest_repos(db_conn, search_type="package", limit=10),
"most_used": get_scroll_repos(db_conn, search_type="package", per_page=10, page=1),
"games": get_section_repos(db_conn, "games", limit=50),
"gui": get_section_repos(db_conn, "gui", limit=50),
"web": get_section_repos(db_conn, "web", limit=50),
}
@app.get("/programIndexDetails", tags=["Programs"])
async def get_program_index_details_endpoint():
"""
All data required for the program index page.
Includes top 10 latest programs and top 10 most-starred programs.
"""
check_db_connection()
return {
"latest": get_latest_repos(db_conn, search_type="program", limit=10),
"most_used": get_scroll_repos(db_conn, search_type="program", per_page=10, page=1),
}
@app.get("/packages/scroll", tags=["Packages"])
async def scroll_packages_endpoint(
per_page: int = Query(20, ge=1, le=20, description="Items per page"),
page: int = Query(1, ge=1, description="Page number"),
):
"""Paginated list of packages sorted by stars."""
check_db_connection()
return get_scroll_repos(db_conn, search_type="package", per_page=per_page, page=page)
@app.get("/programs/scroll", tags=["Programs"])
async def scroll_programs_endpoint(
per_page: int = Query(20, ge=1, le=20, description="Items per page"),
page: int = Query(1, ge=1, description="Page number"),
):
"""Paginated list of programs sorted by stars."""
check_db_connection()
return get_scroll_repos(db_conn, search_type="program", per_page=per_page, page=page)
@app.get("/users/", tags=["Users"])
async def get_user_details_endpoint(
q: str = Query(..., description="User ID (e.g., gh/username or cb/username)")
):
"""
Detailed information about a user, including their packages and programs.
"""
check_db_connection()
return get_user_details(db_conn, q)
@app.get("/packages", tags=["Packages"])
async def get_package_details_endpoint(
q: str = Query(
..., alias="q", description="Repository ID (owner/repo or provider/owner/repo)"
),
version: Optional[str] = Query(
None, description="Specific version to fetch (omit for default branch)"
),
):
"""
Detailed information about a package.
- Without version: returns default branch info (latest release used as proxy)
- With version: returns that specific release's info
Examples:
- /packages?q=gh/rohanvashisht1234/zorsig
- /packages?q=gh/rohanvashisht1234/zorsig&version=0.0.1
"""
check_db_connection()
return get_repo_details(db_conn, q, version)
@app.get("/programs", tags=["Programs"])
async def get_program_details_endpoint(
q: str = Query(
..., alias="q", description="Repository ID (owner/repo or provider/owner/repo)"
),
version: Optional[str] = Query(
None, description="Specific version to fetch (omit for default branch)"
),
):
"""
Detailed information about a program.
- Without version: returns default branch info
- With version: returns that specific release's info
Examples:
- /programs?q=gh/owner/program
- /programs?q=gh/owner/program&version=1.0.0
"""
check_db_connection()
return get_repo_details(db_conn, q, version)
@app.get("/health", tags=["System"])
async def health_check():
"""Check API health status."""
return {"status": "healthy", "database": "connected" if db_conn else "disconnected"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)