MinCPionS / toolset_semantics.py
KoRiF
Tune Formats & Configs
2c29ed1
import sqlite3
from pinecone import Pinecone
import json
from typing import List, Dict, Any
import os
import requests
from dotenv import load_dotenv
load_dotenv()
from sql.sql_utils import load_sql_query
DB_PATH = '/data/huggingface_spaces.db' if os.path.exists('/data') else 'huggingface_spaces.db'
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
TOOLS_INDEX_NAME = "vix-mcp-tools"
SPACES_INDEX_NAME = "vix-mcp-spaces"
SQL_SELECT_TOOLS = "sql/select_tools.sql"
SQL_SELECT_SPACES = "sql/select_spaces.sql"
import time
def create_tools_index(pc: Pinecone):
"""Create Pinecone index for tools if it doesn't exist"""
if not pc.has_index(TOOLS_INDEX_NAME):
print(f"Creating new index: {TOOLS_INDEX_NAME}")
pc.create_index_for_model(
name=TOOLS_INDEX_NAME,
cloud="aws",
region="us-east-1",
embed={
"model": "llama-text-embed-v2",
"field_map": {
"text": "description"
}
}
)
time.sleep(5) # Wait for index to be ready
def create_spaces_index(pc: Pinecone):
"""Create Pinecone index for MCP spaces if it doesn't exist"""
if not pc.has_index(SPACES_INDEX_NAME):
print(f"Creating new index: {SPACES_INDEX_NAME}")
pc.create_index_for_model(
name=SPACES_INDEX_NAME,
cloud="aws",
region="us-east-1",
embed={
"model": "llama-text-embed-v2",
"field_map": {
"text": "profile"
}
}
)
time.sleep(5) # Wait for index to be ready
def fetch_space_schema(space_url: str) -> Dict[str, Any]:
"""Fetch complete schema from MCP space"""
schema_url = f"{space_url}/gradio_api/mcp/schema"
try:
response = requests.get(schema_url, timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error fetching schema from {schema_url}: {e}")
return {}
def prepare_space_profile(space: Dict[str, Any]) -> str:
"""Create a comprehensive description of space capabilities"""
descriptions = []
# Concatenate space metadata
descriptions.append(space['title'])
if space['description']:
descriptions.append(f"(* {space['description']} *)")
if space['tags']:
descriptions.append(f"[ {space['tags']} ]")
# Add raw schema if available
if space.get('schema_url'):
try:
response = requests.get(space['schema_url'], timeout=10)
schema = response.json()
descriptions.append(f"< {json.dumps(schema)} >")
except Exception as e:
print(f"Error fetching schema from {space['schema_url']}: {e}")
return "\t".join(descriptions)
def load_spaces_from_db() -> List[Dict[str, Any]]:
"""Load spaces with their tools count from database"""
query = load_sql_query(SQL_SELECT_SPACES)
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(query)
return [dict(row) for row in cursor.fetchall()]
def upsert_spaces_to_pinecone(pc: Pinecone, spaces: List[Dict[str, Any]]):
"""Upload MCP spaces to Pinecone index"""
index = pc.Index(SPACES_INDEX_NAME)
records = []
for space in spaces:
profile = prepare_space_profile(space)
record = {
"_id": space['space_id'],
"profile": profile if profile else "",
"title": space['title'] if space['title'] else "",
"url": space['schema_url'] if space['schema_url'] else "",
"tool_count": space['tool_count'] if space['tool_count'] else 0,
"tags": space['tags'] if space['tags'] else '[]'
}
records.append(record)
while records:
batch = records[:96]
records = records[96:]
index.upsert_records("spaces", batch)
time.sleep(1)
print(f"Uploaded {len(spaces)} spaces")
def search_spaces(pc: Pinecone, query: str, top_k: int = 5, score_threshold: float = 0.0) -> List[Dict[str, Any]]:
"""Search for relevant MCP spaces based on their description and tools"""
index = pc.Index(SPACES_INDEX_NAME)
results = index.search(
namespace="spaces",
query={
"top_k": top_k,
"inputs": {
'text': query
}
}
)
spaces_list = []
for hit in results['result']['hits']:
score = hit.get("_score", 0)
if score > score_threshold:
fields = hit.get('fields', {})
space = {
"title": fields.get("title"),
"url": fields.get("url"),
"tool_count": fields.get("tool_count"),
"tags": fields.get("tags"),
"score": score
}
spaces_list.append(space)
# Sort by score in descending order
spaces_list.sort(key=lambda x: x["score"], reverse=True)
return spaces_list
def load_tools_from_db() -> List[Dict[str, Any]]:
"""Load tools from SQLite database"""
query = load_sql_query(SQL_SELECT_TOOLS)
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(query)
return [dict(row) for row in cursor.fetchall()]
def upsert_tools_to_pinecone(pc: Pinecone, tools: List[Dict[str, Any]]):
"""Upload tools to Pinecone index"""
index = pc.Index(TOOLS_INDEX_NAME)
records = []
for tool in tools:
record = {
"_id": f"{tool['space_id']}_{tool['tool_name']}",
"description": tool['description'] if tool['description'] else "<{NO DESCRIPTION}>",
"space_id": tool['space_id'],
"tool_name": tool['tool_name'],
"input_schema": json.dumps(json.loads(tool['input_schema']) if tool['input_schema'] else {}),
"server_url": tool['server_url']
}
records.append(record)
while records:
batch = records[:96]
records = records[96:]
index.upsert_records("tools", batch)
time.sleep(1)
print(f"Uploaded {len(tools)} tools")
def search_tools(pc: Pinecone, query: str, top_k: int = 5, score_threshold: float = 0.0) -> List[Dict[str, Any]]:
"""Search for relevant tools based on description"""
index = pc.Index(TOOLS_INDEX_NAME)
results = index.search(
namespace="tools",
query={
"top_k": top_k,
"inputs": {
'text': query
}
}
)
tools_list = []
for hit in results['result']['hits']:
score = hit.get("_score", 0)
if score > score_threshold:
fields = hit.get('fields', {})
tool = {
"name": fields.get("tool_name"),
"description": fields.get("description"),
"inputSchema": json.loads(fields.get("input_schema", "{}")),
"server_url": fields.get("server_url"),
"score": score
}
tools_list.append(tool)
# Sort by score in descending order
tools_list.sort(key=lambda x: x["score"], reverse=True)
return tools_list
def search_suitable_tools(query: str)->List[Dict[str, Any]]:
"""Search for suitable tools based on query"""
pc = Pinecone(api_key=PINECONE_API_KEY)
tools = search_tools(pc, query, top_k=13, score_threshold=0.25)
return tools
def search_suitable_spaces(query: str)->List[Dict[str, Any]]:
"""Search for suitable spaces based on query"""
pc = Pinecone(api_key=PINECONE_API_KEY)
spaces = search_spaces(pc, query, top_k=3, score_threshold=0.1)
return spaces
def initialize_and_upload_to_vector_db():
"""Initialize Pinecone and upload all tools and spaces"""
pc = Pinecone(api_key=PINECONE_API_KEY)
create_tools_index(pc)
create_spaces_index(pc)
print("Loading and uploading tools...")
tools = load_tools_from_db()
print(f"Loaded {len(tools)} tools from database")
upsert_tools_to_pinecone(pc, tools)
print("\nLoading and uploading spaces...")
spaces = load_spaces_from_db()
print(f"Loaded {len(spaces)} spaces from database")
upsert_spaces_to_pinecone(pc, spaces)
print("Upload complete!")
return pc
if __name__ == "__main__":
pc = initialize_and_upload_to_vector_db()
# Interactive search loop
while query := input("Enter a query (or 'exit'/'quit' to stop): "):
if query.lower() in ["exit", "quit"]:
break
relevant_tools = search_tools(pc, query)
if relevant_tools:
print("\nFound tools:")
print(json.dumps(relevant_tools, indent=2))
else:
print("\nNo relevant tools found.")
relevant_spaces = search_spaces(pc, query)
if relevant_spaces:
print("\nFound spaces:")
print(json.dumps(relevant_spaces, indent=2))
else:
print("\nNo relevant spaces found.")
else:
print("The End.")