Spaces:
Sleeping
Sleeping
| import os | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import StreamingResponse, HTMLResponse | |
| import lancedb | |
| from sentence_transformers import SentenceTransformer | |
| from huggingface_hub import snapshot_download | |
| import shutil | |
| import requests | |
| import io | |
| HF_DATASET_BASE_URL = "https://huggingface.co/datasets/theodi/ndl-core-structured-data" | |
| HF_API_BASE_URL = "https://huggingface.co/api/datasets/theodi/ndl-core-structured-data" | |
| THIS_API_URL = "https://theodi-ndl-core-data-api.hf.space" | |
| app = FastAPI() | |
| # 1. Download ONLY the LanceDB folder (Saved space/time by ignoring FAISS) | |
| print("β³ Downloading LanceDB index...") | |
| index_path = snapshot_download( | |
| repo_id="theodi/ndl-core-rag-index", | |
| repo_type="dataset", | |
| allow_patterns="lancedb_search_index/*", # only need this folder, not the FAISS one | |
| force_download=True # ensure we get the latest version | |
| ) | |
| # This i mandatory to avoid "file size is too small" errors from LanceDB | |
| dst = "/tmp/lancedb_search_index" | |
| shutil.copytree(f"{index_path}/lancedb_search_index", dst) | |
| # Verify files copied | |
| for root, dirs, files in os.walk(dst): | |
| for f in files: | |
| p = os.path.join(root, f) | |
| print(p, os.path.getsize(p)) | |
| # 2. Connect DB and load model | |
| db = lancedb.connect(dst) | |
| table = db.open_table("ndl_core_datasets") | |
| all_columns = table.schema.names | |
| columns_to_select = [col for col in all_columns if col != "vector"] | |
| model = SentenceTransformer('all-MiniLM-L6-v2') | |
| def search(query: str, limit: int = 5): | |
| query_vector = model.encode(query) | |
| results = ( | |
| table.search(query_vector) # vector search | |
| .metric("cosine") # Ensure metric matches index | |
| .select(columns_to_select) # explicit column selection | |
| .limit(limit) | |
| .to_pandas() | |
| ) | |
| # Truncate text column to preview only | |
| if "text" in results.columns: | |
| results["text"] = results["text"].apply(truncate_text) | |
| # Add download links to each result | |
| records = results.to_dict(orient='records') | |
| for record in records: | |
| record["download"] = generate_download_info(record) | |
| return records | |
| def download_text_file(identifier: str): | |
| """ | |
| Stream text content as a downloadable file. | |
| Args: | |
| identifier: The record identifier | |
| Returns: | |
| StreamingResponse with the text content as a downloadable file | |
| """ | |
| record = find_record_by_identifier(identifier) | |
| if record is None: | |
| raise HTTPException(status_code=404, detail=f"No record found with identifier: {identifier}") | |
| record_format = record.get("format", "") | |
| if record_format != "text": | |
| raise HTTPException(status_code=400, detail=f"Record is not text format: {record_format}") | |
| text_data = record.get("text", "") | |
| # Create a file-like object from the text | |
| file_stream = io.BytesIO(text_data.encode("utf-8")) | |
| return StreamingResponse( | |
| file_stream, | |
| media_type="text/plain", | |
| headers={ | |
| "Content-Disposition": f"attachment; filename={identifier}.txt" | |
| } | |
| ) | |
| def truncate_text(text: str, max_length: int = 100) -> str: | |
| """Return first max_length characters of text with '...' if truncated, or empty string if no text.""" | |
| if not text: | |
| return "" | |
| if len(text) <= max_length: | |
| return text | |
| return text[:max_length] + "..." | |
| def get_folder_file_urls(folder_name: str) -> list: | |
| """Fetch all file URLs from a folder in the HuggingFace dataset.""" | |
| api_url = f"{HF_API_BASE_URL}/tree/main/{folder_name}" | |
| response = requests.get(api_url) | |
| if response.status_code != 200: | |
| return [] | |
| files = response.json() | |
| file_urls = [] | |
| for file_info in files: | |
| if file_info.get("type") == "file": | |
| file_path = file_info.get("path", "") | |
| download_url = f"{HF_DATASET_BASE_URL}/resolve/main/{file_path}" | |
| file_urls.append(download_url) | |
| return file_urls | |
| def find_record_by_identifier(identifier: str): | |
| """Search for a record in LanceDB by identifier.""" | |
| results = ( | |
| table.search() | |
| .where(f"identifier = '{identifier}'") | |
| .select(columns_to_select) | |
| .limit(1) | |
| .to_pandas() | |
| ) | |
| return results.iloc[0] if not results.empty else None | |
| def generate_download_info(record: dict) -> list: | |
| """Generate download URLs for a search result record.""" | |
| identifier = record.get("identifier", "") | |
| record_format = record.get("format", "") | |
| if record_format == "text": | |
| download_url = f"{THIS_API_URL}/download/text/{identifier}" | |
| return [download_url] | |
| elif record_format == "parquet": | |
| data_file = record.get("data_file", "") | |
| if not data_file: | |
| return [] | |
| if data_file.endswith(".parquet"): | |
| download_url = f"{HF_DATASET_BASE_URL}/resolve/main/{data_file}" | |
| return [download_url] | |
| # It's a folder (UUID) - fetch all files in the folder | |
| return get_folder_file_urls(data_file) | |
| else: | |
| return [] | |
| # Root endpoint with HTML response | |
| def root(): | |
| return """ | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>NDL Core Data API</title> | |
| <style> | |
| body { | |
| font-family: Arial, sans-serif; | |
| max-width: 720px; | |
| margin: 40px auto; | |
| line-height: 1.6; | |
| } | |
| code { | |
| background: #f4f4f4; | |
| padding: 2px 6px; | |
| border-radius: 4px; | |
| } | |
| </style> | |
| </head> | |
| <body> | |
| <h1>NDL Core Data API</h1> | |
| <p> | |
| This Space provides a <strong>FastAPI-based service</strong> for semantic search | |
| and data download across NDL Core datasets. | |
| </p> | |
| <h3>Key Endpoints</h3> | |
| <ul> | |
| <li><code>GET /search</code> β Semantic search over NDL Core datasets</li> | |
| <li><code>GET /download/text/{identifier}</code> β Download dataset text files</li> | |
| </ul> | |
| <p> | |
| For detailed usage examples, parameters, and data definitions, | |
| see the full project README: | |
| </p> | |
| <p> | |
| π <a href="https://huggingface.co/spaces/theodi/ndl-core-data-api/blob/main/README.md" target="_blank"> | |
| Project README | |
| </a> | |
| </p> | |
| <h3>Client Library</h3> | |
| <p> | |
| To easily interact with this API, use the official Python client library with built-in MCP server support: | |
| </p> | |
| <p> | |
| π <a href="https://github.com/theodi/ndl-core-client" target="_blank"> | |
| ndl-core-client | |
| </a> | |
| </p> | |
| </body> | |
| </html> | |
| """ | |