import os from fastapi import FastAPI, HTTPException from fastapi.responses import StreamingResponse, HTMLResponse import lancedb from sentence_transformers import SentenceTransformer from huggingface_hub import snapshot_download import shutil import requests import io HF_DATASET_BASE_URL = "https://huggingface.co/datasets/theodi/ndl-core-structured-data" HF_API_BASE_URL = "https://huggingface.co/api/datasets/theodi/ndl-core-structured-data" THIS_API_URL = "https://theodi-ndl-core-data-api.hf.space" app = FastAPI() # 1. Download ONLY the LanceDB folder (Saved space/time by ignoring FAISS) print("⏳ Downloading LanceDB index...") index_path = snapshot_download( repo_id="theodi/ndl-core-rag-index", repo_type="dataset", allow_patterns="lancedb_search_index/*", # only need this folder, not the FAISS one force_download=True # ensure we get the latest version ) # This i mandatory to avoid "file size is too small" errors from LanceDB dst = "/tmp/lancedb_search_index" shutil.copytree(f"{index_path}/lancedb_search_index", dst) # Verify files copied for root, dirs, files in os.walk(dst): for f in files: p = os.path.join(root, f) print(p, os.path.getsize(p)) # 2. Connect DB and load model db = lancedb.connect(dst) table = db.open_table("ndl_core_datasets") all_columns = table.schema.names columns_to_select = [col for col in all_columns if col != "vector"] model = SentenceTransformer('all-MiniLM-L6-v2') @app.get("/search") def search(query: str, limit: int = 5): query_vector = model.encode(query) results = ( table.search(query_vector) # vector search .metric("cosine") # Ensure metric matches index .select(columns_to_select) # explicit column selection .limit(limit) .to_pandas() ) # Truncate text column to preview only if "text" in results.columns: results["text"] = results["text"].apply(truncate_text) # Add download links to each result records = results.to_dict(orient='records') for record in records: record["download"] = generate_download_info(record) return records @app.get("/download/text/{identifier}") def download_text_file(identifier: str): """ Stream text content as a downloadable file. Args: identifier: The record identifier Returns: StreamingResponse with the text content as a downloadable file """ record = find_record_by_identifier(identifier) if record is None: raise HTTPException(status_code=404, detail=f"No record found with identifier: {identifier}") record_format = record.get("format", "") if record_format != "text": raise HTTPException(status_code=400, detail=f"Record is not text format: {record_format}") text_data = record.get("text", "") # Create a file-like object from the text file_stream = io.BytesIO(text_data.encode("utf-8")) return StreamingResponse( file_stream, media_type="text/plain", headers={ "Content-Disposition": f"attachment; filename={identifier}.txt" } ) def truncate_text(text: str, max_length: int = 100) -> str: """Return first max_length characters of text with '...' if truncated, or empty string if no text.""" if not text: return "" if len(text) <= max_length: return text return text[:max_length] + "..." def get_folder_file_urls(folder_name: str) -> list: """Fetch all file URLs from a folder in the HuggingFace dataset.""" api_url = f"{HF_API_BASE_URL}/tree/main/{folder_name}" response = requests.get(api_url) if response.status_code != 200: return [] files = response.json() file_urls = [] for file_info in files: if file_info.get("type") == "file": file_path = file_info.get("path", "") download_url = f"{HF_DATASET_BASE_URL}/resolve/main/{file_path}" file_urls.append(download_url) return file_urls def find_record_by_identifier(identifier: str): """Search for a record in LanceDB by identifier.""" results = ( table.search() .where(f"identifier = '{identifier}'") .select(columns_to_select) .limit(1) .to_pandas() ) return results.iloc[0] if not results.empty else None def generate_download_info(record: dict) -> list: """Generate download URLs for a search result record.""" identifier = record.get("identifier", "") record_format = record.get("format", "") if record_format == "text": download_url = f"{THIS_API_URL}/download/text/{identifier}" return [download_url] elif record_format == "parquet": data_file = record.get("data_file", "") if not data_file: return [] if data_file.endswith(".parquet"): download_url = f"{HF_DATASET_BASE_URL}/resolve/main/{data_file}" return [download_url] # It's a folder (UUID) - fetch all files in the folder return get_folder_file_urls(data_file) else: return [] # Root endpoint with HTML response @app.get("/", response_class=HTMLResponse, include_in_schema=False) def root(): return """ NDL Core Data API

NDL Core Data API

This Space provides a FastAPI-based service for semantic search and data download across NDL Core datasets.

Key Endpoints

For detailed usage examples, parameters, and data definitions, see the full project README:

👉 Project README

Client Library

To easily interact with this API, use the official Python client library with built-in MCP server support:

👉 ndl-core-client

"""