Spaces:
Running
Running
File size: 7,040 Bytes
98aa770 ec03603 98aa770 a2724e3 98aa770 ec03603 7bfb909 ec03603 7bfb909 ec03603 98aa770 ec03603 98aa770 ec03603 98aa770 ec03603 98aa770 7bfb909 98aa770 ec03603 7bfb909 277590a ec03603 277590a ec03603 98aa770 ec03603 98aa770 a2724e3 98aa770 277590a ec03603 98aa770 277590a ec03603 98aa770 ec03603 745e788 ec03603 98aa770 ec03603 98aa770 ec03603 98aa770 a2724e3 98aa770 a2724e3 ec03603 98aa770 ec03603 98aa770 ec03603 7bfb909 ec03603 7bfb909 98aa770 fc8a76d 98aa770 a2724e3 98aa770 ec03603 a2724e3 98aa770 ec03603 98aa770 a2724e3 5ef0ab2 a2724e3 ec03603 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 | """
AgentBase Visualisation UI.
Author: Arastun Mammadli
Date: [Current Date]
"""
from typing import List, Tuple
from pathlib import Path
import streamlit as st
import pandas as pd
import numpy as np
from retrieval.models.bm25 import BM25Retriever
from retrieval.models.sentence_bert import DenseRetriever
from retrieval.utils import load_queries
@st.cache_resource()
def load_retrievers(agentbase_path: str, index_configs: List[str]) -> Tuple[dict, dict]:
bm25s = {}
bges = {}
toolrets = {}
for idx_config in index_configs:
bm25s[idx_config] = BM25Retriever(agentbase_path, index_config=idx_config)
bges[idx_config] = DenseRetriever("BAAI/bge-large-en-v1.5", agentbase_path, index_config=idx_config)
toolrets[idx_config] = DenseRetriever("mangopy/ToolRet-trained-bge-large-en-v1.5", agentbase_path, index_config=idx_config)
return bm25s, bges, toolrets
@st.cache_resource()
def load_agentbase_data(agentbase_path: str) -> pd.DataFrame:
return pd.read_csv(agentbase_path)
def keyword_filter(query, top_k, df, columns=["agent_name", "agent_description"]) -> List[Tuple[str, float]]:
"""
Simple keyword-based boolean filter across specified columns.
"""
mask = df[columns].astype(str).apply(
lambda col: col.str.contains(query, case=False, na=False)
).any(axis=1)
filtered_df = df[mask].head(top_k).copy()
filtered_df["scores"] = 1
return filtered_df
class AgentBaseUI:
"""
AgentBase Streamlit-based UI Components.
"""
def __init__(self, agentbase_path, platforms_path):
self.agents_df = load_agentbase_data(agentbase_path)
self.platforms_df = pd.read_csv(platforms_path)
self.bm25s, self.bges, self.toolrets = load_retrievers(agentbase_path, index_configs=["v1", "naive"])
# selection options and defaults
self.retrieval_models = ["bge-large", "toolret", "bm25", "keyword"]
self.selected_model = "bge-large"
self.indexing_configs = ["v1", "naive"]
self.indexing_config = "v1"
def header_panel(self):
st.title("AgentBase Platform Demo")
st.write("A Large-Scale Agent Collection for Automated Agent Recommendation.")
st.subheader("🔍 Retrieval")
if "query" not in st.session_state:
st.session_state.query = ""
query_suggestions = list(load_queries("data/samples.json").values())
suggestion_cols = st.columns(len(query_suggestions))
for i, suggestion in enumerate(query_suggestions):
if suggestion_cols[i].button(suggestion):
st.session_state.query = suggestion
col1, col2, col3 = st.columns([4, 1, 1])
with col1:
st.text_input("", placeholder="Type to search...", key="query")
with col2:
self.selected_model = st.selectbox("", self.retrieval_models, index=0)
with col3:
self.indexing_config = st.selectbox("", self.indexing_configs, index=0)
_, col2 = st.columns([2, 1])
with col2:
with st.expander("See explanation"):
st.write('''
- **Retrieval Models**:
- **BGE-Large**: a dense retrieval model.
- **ToolRet**: a dense retrieval model fine-tuned for tool search.
- **BM25**: a sparse retrieval model.
- **Keyword**: simple boolean keyword matching.
- **Indexing Configurations**:
- **v1**: using all columns with priority ordering (e.g., name, description come first).
- **naive**: using agent name and description only.
''')
def retrieval_panel(self):
top_k = st.slider("Top K", 3, 100, 5)
if st.session_state.query:
self.filtered_df = self.retrieve_agents(st.session_state.query, top_k)
else:
self.filtered_df = self.agents_df.copy()
self.filtered_df['scores'] = 0.0
if len(self.filtered_df) > 0:
st.write(f"Showing {top_k} of {len(self.agents_df)} agents")
agent_config = { # clean column display
"agent_url": st.column_config.LinkColumn("agent_url", display_text="Visit →"),
"agent_description": st.column_config.TextColumn("agent_description", width="large"),
"agent_accessibility": st.column_config.TextColumn("agent_accessibility", width="small"),
"agent_pricing": st.column_config.TextColumn("agent_pricing", width="medium"),
"base_model": st.column_config.TextColumn("base_model", width="medium"),
}
key_columns = ['agent_name', 'platform_name', 'agent_description', 'agent_pricing', 'base_model', 'agent_url', 'scores']
if (self.filtered_df['scores'] == 0).all(): key_columns.remove("scores")
st.dataframe(
self.filtered_df[key_columns].head(top_k),
column_config=agent_config,
use_container_width=True,
hide_index=True
)
else:
st.info("No agents match your search.")
def retrieve_agents(self, query, top_k=100) -> pd.DataFrame:
"""
Returns a filtered dataframe with updated scores.
Default maximum top_k of 100
"""
if self.selected_model == 'keyword':
return keyword_filter(query, top_k, self.agents_df)
elif self.selected_model == 'bm25':
res = self.bm25s[self.indexing_config].retrieve(query, top_k)
elif self.selected_model == 'bge-large':
res = self.bges[self.indexing_config].retrieve(query, top_k)
elif self.selected_model == 'toolret':
res = self.toolrets[self.indexing_config].retrieve(query, top_k)
else:
raise ValueError(f"Selected model must be one of {self.retrieval_models}")
self.agents_df["scores"] = 0.0
agent_ids, _ = zip(*res)
filtered_df = self.agents_df.loc[self.agents_df.agent_id.isin(agent_ids)]
for index, row in filtered_df.iterrows():
score = dict(res).get(row['agent_id'], 0)
filtered_df.at[index, 'scores'] = score
return filtered_df.sort_values(by="scores", ascending=False)
def info_panel(self):
with st.expander(f"View AgentBase-v1.1"):
st.dataframe(
self.agents_df,
use_container_width=True,
hide_index=True
)
st.dataframe(
self.platforms_df,
use_container_width=True,
hide_index=True
)
if __name__ == "__main__":
BASE_DIR = Path(__file__).resolve().parent
agentbase_path = BASE_DIR / "../data/agentbase-v1.1.csv"
platforms_path = BASE_DIR / "../data/platforms.csv"
agentbaseui = AgentBaseUI(agentbase_path, platforms_path)
agentbaseui.header_panel()
agentbaseui.retrieval_panel()
agentbaseui.info_panel()
|