| import os |
| import json |
| import pandas as pd |
| from huggingface_hub import HfApi, hf_hub_download, InferenceClient |
|
|
| HF_TOKEN = os.environ.get("HF_TOKEN") |
| REPO_ID = os.environ.get("HF_DATASET_ID", "Brettapps/brettapps-aussie-mcp-databank") |
|
|
| client = InferenceClient( |
| provider="hf-inference", |
| api_key=HF_TOKEN, |
| ) |
|
|
| def get_embeddings(text): |
| """Generate embeddings using the provided BART model for semantic search.""" |
| try: |
| return client.feature_extraction( |
| text, |
| model="facebook/bart-base", |
| ) |
| except Exception as e: |
| print(f"Embedding error: {e}") |
| return None |
|
|
| def save_to_databank(filename, content, folder="knowledge"): |
| """Saves a file to the Hugging Face Dataset repository.""" |
| api = HfApi(token=HF_TOKEN) |
| path_in_repo = f"{folder}/{filename}" |
| |
| |
| os.makedirs(folder, exist_ok=True) |
| local_path = os.path.join(folder, filename) |
| |
| with open(local_path, "w") as f: |
| if isinstance(content, (dict, list)): |
| json.dump(content, f, indent=2) |
| else: |
| f.write(content) |
| |
| try: |
| api.upload_file( |
| path_or_fileobj=local_path, |
| path_in_repo=path_in_repo, |
| repo_id=REPO_ID, |
| repo_type="dataset", |
| ) |
| return True |
| except Exception as e: |
| print(f"Upload error: {e}") |
| return False |
|
|
| def load_from_databank(filename, folder="knowledge"): |
| """Loads a file from the Hugging Face Dataset repository.""" |
| try: |
| local_path = hf_hub_download( |
| repo_id=REPO_ID, |
| filename=f"{folder}/{filename}", |
| repo_type="dataset", |
| token=HF_TOKEN |
| ) |
| with open(local_path, "r") as f: |
| if filename.endswith(".json"): |
| return json.load(f) |
| return f.read() |
| except Exception as e: |
| print(f"Download error: {e}") |
| return None |
|
|
| class KnowledgeManager: |
| def __init__(self, knowledge_dir="knowledge"): |
| self.knowledge_dir = knowledge_dir |
| self.index = {} |
| self.initialized = False |
|
|
| def initialize_index(self): |
| """Build the semantic index for all local knowledge files.""" |
| if not os.path.exists(self.knowledge_dir): |
| return |
| |
| for filename in os.listdir(self.knowledge_dir): |
| if filename.endswith(".md"): |
| path = os.path.join(self.knowledge_dir, filename) |
| with open(path, "r") as f: |
| content = f.read() |
| |
| embedding = get_embeddings(content[:500]) |
| if embedding is not None: |
| self.index[filename] = embedding |
| self.initialized = True |
| print(f"Knowledge index initialized with {len(self.index)} files.") |
|
|
| def find_relevant_persona(self, query): |
| """Find the most relevant persona file for a given query using cosine similarity.""" |
| if not self.initialized: |
| self.initialize_index() |
| |
| query_embedding = get_embeddings(query) |
| if query_embedding is None: |
| return "router_instructions.md" |
| |
| best_file = "router_instructions.md" |
| best_score = -1 |
| |
| |
| |
| import numpy as np |
| |
| q_vec = np.array(query_embedding) |
| |
| for filename, f_vec in self.index.items(): |
| f_vec = np.array(f_vec) |
| |
| score = np.dot(q_vec, f_vec) / (np.linalg.norm(q_vec) * np.linalg.norm(f_vec)) |
| if score > best_score: |
| best_score = score |
| best_file = filename |
| |
| return best_file |
|
|