|
|
import sqlite3 |
|
|
from pinecone import Pinecone |
|
|
import json |
|
|
from typing import List, Dict, Any |
|
|
import os |
|
|
import requests |
|
|
from dotenv import load_dotenv |
|
|
load_dotenv() |
|
|
|
|
|
from sql.sql_utils import load_sql_query |
|
|
DB_PATH = '/data/huggingface_spaces.db' if os.path.exists('/data') else 'huggingface_spaces.db' |
|
|
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") |
|
|
TOOLS_INDEX_NAME = "vix-mcp-tools" |
|
|
SPACES_INDEX_NAME = "vix-mcp-spaces" |
|
|
SQL_SELECT_TOOLS = "sql/select_tools.sql" |
|
|
SQL_SELECT_SPACES = "sql/select_spaces.sql" |
|
|
|
|
|
import time |
|
|
|
|
|
def create_tools_index(pc: Pinecone): |
|
|
"""Create Pinecone index for tools if it doesn't exist""" |
|
|
if not pc.has_index(TOOLS_INDEX_NAME): |
|
|
print(f"Creating new index: {TOOLS_INDEX_NAME}") |
|
|
pc.create_index_for_model( |
|
|
name=TOOLS_INDEX_NAME, |
|
|
cloud="aws", |
|
|
region="us-east-1", |
|
|
embed={ |
|
|
"model": "llama-text-embed-v2", |
|
|
"field_map": { |
|
|
"text": "description" |
|
|
} |
|
|
} |
|
|
) |
|
|
time.sleep(5) |
|
|
|
|
|
def create_spaces_index(pc: Pinecone): |
|
|
"""Create Pinecone index for MCP spaces if it doesn't exist""" |
|
|
if not pc.has_index(SPACES_INDEX_NAME): |
|
|
print(f"Creating new index: {SPACES_INDEX_NAME}") |
|
|
pc.create_index_for_model( |
|
|
name=SPACES_INDEX_NAME, |
|
|
cloud="aws", |
|
|
region="us-east-1", |
|
|
embed={ |
|
|
"model": "llama-text-embed-v2", |
|
|
"field_map": { |
|
|
"text": "profile" |
|
|
} |
|
|
} |
|
|
) |
|
|
time.sleep(5) |
|
|
|
|
|
def fetch_space_schema(space_url: str) -> Dict[str, Any]: |
|
|
"""Fetch complete schema from MCP space""" |
|
|
schema_url = f"{space_url}/gradio_api/mcp/schema" |
|
|
try: |
|
|
response = requests.get(schema_url, timeout=10) |
|
|
response.raise_for_status() |
|
|
return response.json() |
|
|
except Exception as e: |
|
|
print(f"Error fetching schema from {schema_url}: {e}") |
|
|
return {} |
|
|
|
|
|
def prepare_space_profile(space: Dict[str, Any]) -> str: |
|
|
"""Create a comprehensive description of space capabilities""" |
|
|
descriptions = [] |
|
|
|
|
|
|
|
|
descriptions.append(space['title']) |
|
|
if space['description']: |
|
|
descriptions.append(f"(* {space['description']} *)") |
|
|
if space['tags']: |
|
|
descriptions.append(f"[ {space['tags']} ]") |
|
|
|
|
|
|
|
|
if space.get('schema_url'): |
|
|
try: |
|
|
response = requests.get(space['schema_url'], timeout=10) |
|
|
schema = response.json() |
|
|
descriptions.append(f"< {json.dumps(schema)} >") |
|
|
except Exception as e: |
|
|
print(f"Error fetching schema from {space['schema_url']}: {e}") |
|
|
|
|
|
return "\t".join(descriptions) |
|
|
|
|
|
def load_spaces_from_db() -> List[Dict[str, Any]]: |
|
|
"""Load spaces with their tools count from database""" |
|
|
query = load_sql_query(SQL_SELECT_SPACES) |
|
|
with sqlite3.connect(DB_PATH) as conn: |
|
|
conn.row_factory = sqlite3.Row |
|
|
cursor = conn.cursor() |
|
|
cursor.execute(query) |
|
|
return [dict(row) for row in cursor.fetchall()] |
|
|
|
|
|
def upsert_spaces_to_pinecone(pc: Pinecone, spaces: List[Dict[str, Any]]): |
|
|
"""Upload MCP spaces to Pinecone index""" |
|
|
index = pc.Index(SPACES_INDEX_NAME) |
|
|
|
|
|
records = [] |
|
|
for space in spaces: |
|
|
profile = prepare_space_profile(space) |
|
|
record = { |
|
|
"_id": space['space_id'], |
|
|
"profile": profile if profile else "", |
|
|
"title": space['title'] if space['title'] else "", |
|
|
"url": space['schema_url'] if space['schema_url'] else "", |
|
|
"tool_count": space['tool_count'] if space['tool_count'] else 0, |
|
|
"tags": space['tags'] if space['tags'] else '[]' |
|
|
} |
|
|
records.append(record) |
|
|
while records: |
|
|
batch = records[:96] |
|
|
records = records[96:] |
|
|
index.upsert_records("spaces", batch) |
|
|
time.sleep(1) |
|
|
print(f"Uploaded {len(spaces)} spaces") |
|
|
|
|
|
def search_spaces(pc: Pinecone, query: str, top_k: int = 5, score_threshold: float = 0.0) -> List[Dict[str, Any]]: |
|
|
"""Search for relevant MCP spaces based on their description and tools""" |
|
|
index = pc.Index(SPACES_INDEX_NAME) |
|
|
|
|
|
results = index.search( |
|
|
namespace="spaces", |
|
|
query={ |
|
|
"top_k": top_k, |
|
|
"inputs": { |
|
|
'text': query |
|
|
} |
|
|
} |
|
|
) |
|
|
|
|
|
spaces_list = [] |
|
|
for hit in results['result']['hits']: |
|
|
score = hit.get("_score", 0) |
|
|
if score > score_threshold: |
|
|
fields = hit.get('fields', {}) |
|
|
space = { |
|
|
"title": fields.get("title"), |
|
|
"url": fields.get("url"), |
|
|
"tool_count": fields.get("tool_count"), |
|
|
"tags": fields.get("tags"), |
|
|
"score": score |
|
|
} |
|
|
spaces_list.append(space) |
|
|
|
|
|
|
|
|
spaces_list.sort(key=lambda x: x["score"], reverse=True) |
|
|
return spaces_list |
|
|
|
|
|
def load_tools_from_db() -> List[Dict[str, Any]]: |
|
|
"""Load tools from SQLite database""" |
|
|
query = load_sql_query(SQL_SELECT_TOOLS) |
|
|
with sqlite3.connect(DB_PATH) as conn: |
|
|
conn.row_factory = sqlite3.Row |
|
|
cursor = conn.cursor() |
|
|
cursor.execute(query) |
|
|
return [dict(row) for row in cursor.fetchall()] |
|
|
|
|
|
def upsert_tools_to_pinecone(pc: Pinecone, tools: List[Dict[str, Any]]): |
|
|
"""Upload tools to Pinecone index""" |
|
|
index = pc.Index(TOOLS_INDEX_NAME) |
|
|
|
|
|
records = [] |
|
|
for tool in tools: |
|
|
record = { |
|
|
"_id": f"{tool['space_id']}_{tool['tool_name']}", |
|
|
"description": tool['description'] if tool['description'] else "<{NO DESCRIPTION}>", |
|
|
"space_id": tool['space_id'], |
|
|
"tool_name": tool['tool_name'], |
|
|
"input_schema": json.dumps(json.loads(tool['input_schema']) if tool['input_schema'] else {}), |
|
|
"server_url": tool['server_url'] |
|
|
} |
|
|
records.append(record) |
|
|
|
|
|
while records: |
|
|
batch = records[:96] |
|
|
records = records[96:] |
|
|
index.upsert_records("tools", batch) |
|
|
time.sleep(1) |
|
|
print(f"Uploaded {len(tools)} tools") |
|
|
|
|
|
def search_tools(pc: Pinecone, query: str, top_k: int = 5, score_threshold: float = 0.0) -> List[Dict[str, Any]]: |
|
|
"""Search for relevant tools based on description""" |
|
|
index = pc.Index(TOOLS_INDEX_NAME) |
|
|
|
|
|
results = index.search( |
|
|
namespace="tools", |
|
|
query={ |
|
|
"top_k": top_k, |
|
|
"inputs": { |
|
|
'text': query |
|
|
} |
|
|
} |
|
|
) |
|
|
|
|
|
tools_list = [] |
|
|
for hit in results['result']['hits']: |
|
|
score = hit.get("_score", 0) |
|
|
if score > score_threshold: |
|
|
fields = hit.get('fields', {}) |
|
|
tool = { |
|
|
"name": fields.get("tool_name"), |
|
|
"description": fields.get("description"), |
|
|
"inputSchema": json.loads(fields.get("input_schema", "{}")), |
|
|
"server_url": fields.get("server_url"), |
|
|
"score": score |
|
|
} |
|
|
tools_list.append(tool) |
|
|
|
|
|
|
|
|
tools_list.sort(key=lambda x: x["score"], reverse=True) |
|
|
return tools_list |
|
|
|
|
|
def search_suitable_tools(query: str)->List[Dict[str, Any]]: |
|
|
"""Search for suitable tools based on query""" |
|
|
pc = Pinecone(api_key=PINECONE_API_KEY) |
|
|
tools = search_tools(pc, query, top_k=13, score_threshold=0.25) |
|
|
return tools |
|
|
|
|
|
def search_suitable_spaces(query: str)->List[Dict[str, Any]]: |
|
|
"""Search for suitable spaces based on query""" |
|
|
pc = Pinecone(api_key=PINECONE_API_KEY) |
|
|
spaces = search_spaces(pc, query, top_k=3, score_threshold=0.1) |
|
|
return spaces |
|
|
|
|
|
def initialize_and_upload_to_vector_db(): |
|
|
"""Initialize Pinecone and upload all tools and spaces""" |
|
|
pc = Pinecone(api_key=PINECONE_API_KEY) |
|
|
create_tools_index(pc) |
|
|
create_spaces_index(pc) |
|
|
|
|
|
print("Loading and uploading tools...") |
|
|
tools = load_tools_from_db() |
|
|
print(f"Loaded {len(tools)} tools from database") |
|
|
upsert_tools_to_pinecone(pc, tools) |
|
|
|
|
|
print("\nLoading and uploading spaces...") |
|
|
spaces = load_spaces_from_db() |
|
|
print(f"Loaded {len(spaces)} spaces from database") |
|
|
upsert_spaces_to_pinecone(pc, spaces) |
|
|
|
|
|
print("Upload complete!") |
|
|
return pc |
|
|
|
|
|
if __name__ == "__main__": |
|
|
pc = initialize_and_upload_to_vector_db() |
|
|
|
|
|
|
|
|
|
|
|
while query := input("Enter a query (or 'exit'/'quit' to stop): "): |
|
|
if query.lower() in ["exit", "quit"]: |
|
|
break |
|
|
relevant_tools = search_tools(pc, query) |
|
|
if relevant_tools: |
|
|
print("\nFound tools:") |
|
|
print(json.dumps(relevant_tools, indent=2)) |
|
|
else: |
|
|
print("\nNo relevant tools found.") |
|
|
relevant_spaces = search_spaces(pc, query) |
|
|
if relevant_spaces: |
|
|
print("\nFound spaces:") |
|
|
print(json.dumps(relevant_spaces, indent=2)) |
|
|
else: |
|
|
print("\nNo relevant spaces found.") |
|
|
else: |
|
|
print("The End.") |