Huseyin Kir
lib mcp support added
3857228
import os
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse, HTMLResponse
import lancedb
from sentence_transformers import SentenceTransformer
from huggingface_hub import snapshot_download
import shutil
import requests
import io
HF_DATASET_BASE_URL = "https://huggingface.co/datasets/theodi/ndl-core-structured-data"
HF_API_BASE_URL = "https://huggingface.co/api/datasets/theodi/ndl-core-structured-data"
THIS_API_URL = "https://theodi-ndl-core-data-api.hf.space"
app = FastAPI()
# 1. Download ONLY the LanceDB folder (Saved space/time by ignoring FAISS)
print("⏳ Downloading LanceDB index...")
index_path = snapshot_download(
repo_id="theodi/ndl-core-rag-index",
repo_type="dataset",
allow_patterns="lancedb_search_index/*", # only need this folder, not the FAISS one
force_download=True # ensure we get the latest version
)
# This i mandatory to avoid "file size is too small" errors from LanceDB
dst = "/tmp/lancedb_search_index"
shutil.copytree(f"{index_path}/lancedb_search_index", dst)
# Verify files copied
for root, dirs, files in os.walk(dst):
for f in files:
p = os.path.join(root, f)
print(p, os.path.getsize(p))
# 2. Connect DB and load model
db = lancedb.connect(dst)
table = db.open_table("ndl_core_datasets")
all_columns = table.schema.names
columns_to_select = [col for col in all_columns if col != "vector"]
model = SentenceTransformer('all-MiniLM-L6-v2')
@app.get("/search")
def search(query: str, limit: int = 5):
query_vector = model.encode(query)
results = (
table.search(query_vector) # vector search
.metric("cosine") # Ensure metric matches index
.select(columns_to_select) # explicit column selection
.limit(limit)
.to_pandas()
)
# Truncate text column to preview only
if "text" in results.columns:
results["text"] = results["text"].apply(truncate_text)
# Add download links to each result
records = results.to_dict(orient='records')
for record in records:
record["download"] = generate_download_info(record)
return records
@app.get("/download/text/{identifier}")
def download_text_file(identifier: str):
"""
Stream text content as a downloadable file.
Args:
identifier: The record identifier
Returns:
StreamingResponse with the text content as a downloadable file
"""
record = find_record_by_identifier(identifier)
if record is None:
raise HTTPException(status_code=404, detail=f"No record found with identifier: {identifier}")
record_format = record.get("format", "")
if record_format != "text":
raise HTTPException(status_code=400, detail=f"Record is not text format: {record_format}")
text_data = record.get("text", "")
# Create a file-like object from the text
file_stream = io.BytesIO(text_data.encode("utf-8"))
return StreamingResponse(
file_stream,
media_type="text/plain",
headers={
"Content-Disposition": f"attachment; filename={identifier}.txt"
}
)
def truncate_text(text: str, max_length: int = 100) -> str:
"""Return first max_length characters of text with '...' if truncated, or empty string if no text."""
if not text:
return ""
if len(text) <= max_length:
return text
return text[:max_length] + "..."
def get_folder_file_urls(folder_name: str) -> list:
"""Fetch all file URLs from a folder in the HuggingFace dataset."""
api_url = f"{HF_API_BASE_URL}/tree/main/{folder_name}"
response = requests.get(api_url)
if response.status_code != 200:
return []
files = response.json()
file_urls = []
for file_info in files:
if file_info.get("type") == "file":
file_path = file_info.get("path", "")
download_url = f"{HF_DATASET_BASE_URL}/resolve/main/{file_path}"
file_urls.append(download_url)
return file_urls
def find_record_by_identifier(identifier: str):
"""Search for a record in LanceDB by identifier."""
results = (
table.search()
.where(f"identifier = '{identifier}'")
.select(columns_to_select)
.limit(1)
.to_pandas()
)
return results.iloc[0] if not results.empty else None
def generate_download_info(record: dict) -> list:
"""Generate download URLs for a search result record."""
identifier = record.get("identifier", "")
record_format = record.get("format", "")
if record_format == "text":
download_url = f"{THIS_API_URL}/download/text/{identifier}"
return [download_url]
elif record_format == "parquet":
data_file = record.get("data_file", "")
if not data_file:
return []
if data_file.endswith(".parquet"):
download_url = f"{HF_DATASET_BASE_URL}/resolve/main/{data_file}"
return [download_url]
# It's a folder (UUID) - fetch all files in the folder
return get_folder_file_urls(data_file)
else:
return []
# Root endpoint with HTML response
@app.get("/", response_class=HTMLResponse, include_in_schema=False)
def root():
return """
<!DOCTYPE html>
<html>
<head>
<title>NDL Core Data API</title>
<style>
body {
font-family: Arial, sans-serif;
max-width: 720px;
margin: 40px auto;
line-height: 1.6;
}
code {
background: #f4f4f4;
padding: 2px 6px;
border-radius: 4px;
}
</style>
</head>
<body>
<h1>NDL Core Data API</h1>
<p>
This Space provides a <strong>FastAPI-based service</strong> for semantic search
and data download across NDL Core datasets.
</p>
<h3>Key Endpoints</h3>
<ul>
<li><code>GET /search</code> – Semantic search over NDL Core datasets</li>
<li><code>GET /download/text/{identifier}</code> – Download dataset text files</li>
</ul>
<p>
For detailed usage examples, parameters, and data definitions,
see the full project README:
</p>
<p>
πŸ‘‰ <a href="https://huggingface.co/spaces/theodi/ndl-core-data-api/blob/main/README.md" target="_blank">
Project README
</a>
</p>
<h3>Client Library</h3>
<p>
To easily interact with this API, use the official Python client library with built-in MCP server support:
</p>
<p>
πŸ‘‰ <a href="https://github.com/theodi/ndl-core-client" target="_blank">
ndl-core-client
</a>
</p>
</body>
</html>
"""