|
|
import os |
|
|
import requests |
|
|
|
|
|
|
|
|
MCP_SERVER_URL = os.getenv("MCP_SERVER_URL", "http://localhost:8000") |
|
|
HF_TOKEN = os.getenv("HF_TOKEN", "") |
|
|
|
|
|
|
|
|
def call_api_endpoint(endpoint: str, data: dict = None, method: str = "POST"): |
|
|
"""Call an API endpoint on the MCP server.""" |
|
|
url = f"{MCP_SERVER_URL}/api/{endpoint}" |
|
|
|
|
|
|
|
|
headers = {} |
|
|
if HF_TOKEN: |
|
|
headers["Authorization"] = f"Bearer {HF_TOKEN}" |
|
|
|
|
|
try: |
|
|
if method == "GET": |
|
|
response = requests.get(url, headers=headers) |
|
|
else: |
|
|
response = requests.post(url, json=data or {}, headers=headers) |
|
|
response.raise_for_status() |
|
|
return response.json() |
|
|
except Exception as e: |
|
|
print(f"Error calling API {endpoint}: {e}") |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
def get_catalog(): |
|
|
"""Fetch the dataset catalog from MCP server.""" |
|
|
result = call_api_endpoint("catalog", method="GET") |
|
|
if isinstance(result, list): |
|
|
return result |
|
|
if "error" in result: |
|
|
print(f"Error fetching catalog: {result['error']}") |
|
|
return [] |
|
|
|
|
|
|
|
|
def get_user_subscriptions(hf_user: str, hf_token: str = None): |
|
|
"""Fetch subscriptions for a specific user. Requires HF token for authentication.""" |
|
|
if not hf_user: |
|
|
return [] |
|
|
if not hf_token: |
|
|
print("Warning: hf_token required for user_subscriptions") |
|
|
return [] |
|
|
result = call_api_endpoint("user_subscriptions", { |
|
|
"hf_user": hf_user, |
|
|
"hf_token": hf_token |
|
|
}) |
|
|
if isinstance(result, list): |
|
|
return result |
|
|
if "error" in result: |
|
|
print(f"Error fetching subscriptions: {result['error']}") |
|
|
return [] |
|
|
|
|
|
|
|
|
def subscribe_free(dataset_id: str, hf_user: str, hf_token: str = None): |
|
|
"""Subscribe to a free dataset.""" |
|
|
return call_api_endpoint("subscribe_free", { |
|
|
"dataset_id": dataset_id, |
|
|
"hf_token": hf_token or "", |
|
|
"hf_user": hf_user |
|
|
}) |
|
|
|
|
|
|
|
|
def create_checkout_session(dataset_id: str, hf_user: str, hf_token: str = None): |
|
|
"""Create a Stripe checkout session for a paid dataset.""" |
|
|
return call_api_endpoint("create_checkout_session", { |
|
|
"dataset_id": dataset_id, |
|
|
"hf_token": hf_token or "", |
|
|
"hf_user": hf_user |
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
def call_mcp_tool(tool_name: str, arguments: dict): |
|
|
"""Legacy wrapper. Use specific functions above instead.""" |
|
|
if tool_name == "subscribe_free": |
|
|
return call_api_endpoint("subscribe_free", arguments) |
|
|
elif tool_name == "create_checkout_session": |
|
|
return call_api_endpoint("create_checkout_session", arguments) |
|
|
elif tool_name == "get_dataset_catalog": |
|
|
return get_catalog() |
|
|
|
|
|
print(f"Tool {tool_name} not supported via API.") |
|
|
return None |
|
|
|