import streamlit as st
import requests
import json
import os
from pathlib import Path
# ─────────────────────────── Page config ──────────────────────────────────────
st.set_page_config(
page_title="Code Search",
page_icon="⌕",
layout="wide",
initial_sidebar_state="expanded",
)
# ─────────────────────────── Custom CSS ───────────────────────────────────────
st.markdown("""
""", unsafe_allow_html=True)
# ─────────────────────────── Helpers ──────────────────────────────────────────
def api(method: str, path: str, **kwargs) -> requests.Response:
base = st.session_state.get("api_url", "").rstrip("/")
timeout = kwargs.pop("timeout", 60)
return requests.request(method, f"{base}{path}", timeout=timeout, **kwargs)
def health_check(url: str) -> bool:
try:
r = requests.get(url.rstrip("/") + "/health", timeout=5)
return r.ok and r.json().get("models_loaded", False)
except Exception:
return False
def render_result_card(r: dict):
st.markdown(f"""
#{r['rank']}
score {r['score']:.4f}
{r['text']}
""", unsafe_allow_html=True)
# ─────────────────────────── Sidebar ──────────────────────────────────────────
with st.sidebar:
st.markdown("### ⌕ Code Search")
st.markdown("---")
api_url = st.text_input(
"API URL",
value=st.session_state.get("api_url", os.getenv("API_URL", "")),
placeholder="https://your-space.hf.space",
help="Your Code Search API HuggingFace Space URL",
)
st.session_state["api_url"] = api_url
if api_url:
is_up = health_check(api_url)
dot = "online" if is_up else "offline"
label = "API online · models loaded" if is_up else "API offline or unreachable"
st.markdown(
f''
f'{label}',
unsafe_allow_html=True,
)
st.markdown("---")
st.markdown(
''
"jina-embeddings-v2-base-code
FAISS · AST chunking · /data persistence"
"",
unsafe_allow_html=True,
)
# ─────────────────────────── Header ───────────────────────────────────────────
st.markdown("""
""", unsafe_allow_html=True)
if not api_url:
st.info("👈 Enter your API URL in the sidebar to get started.")
st.stop()
# ─────────────────────────── Tabs ─────────────────────────────────────────────
t_search, t_index, t_batch, t_docs, t_embed = st.tabs([
"🔍 Search",
"📄 Index File",
"📦 Batch Index",
"🗂 Documents",
"🧮 Embed",
])
# ══════════════════════════════════════════════════════════════════════════════
# TAB 1 — SEARCH
# ══════════════════════════════════════════════════════════════════════════════
with t_search:
st.markdown("#### Search an indexed codebase")
# Fetch doc list for autocomplete
doc_ids = []
try:
docs_resp = api("GET", "/documents", timeout=10)
if docs_resp.ok:
doc_ids = [d["doc_id"] for d in docs_resp.json().get("documents", [])]
except Exception:
pass
col1, col2 = st.columns([2, 1])
with col1:
if doc_ids:
doc_id = st.selectbox("Document / Project ID", options=doc_ids)
else:
doc_id = st.text_input("Document / Project ID", placeholder="my_project")
with col2:
top_k = st.slider("Top K results", min_value=1, max_value=20, value=5)
query = st.text_area(
"Query",
placeholder="e.g. fetch user from database\nor: async function that handles authentication",
height=80,
)
if st.button("Search ⌕", use_container_width=True):
if not query.strip():
st.warning("Enter a query.")
elif not doc_id:
st.warning("Enter a document ID.")
else:
with st.spinner("Searching…"):
try:
r = api("POST", "/search", json={
"doc_id": doc_id,
"query": query.strip(),
"top_k": top_k,
})
if r.ok:
data = r.json()
results = data.get("results", [])
st.markdown(f"**{len(results)} results** for `{query[:60]}`")
st.markdown("---")
for res in results:
render_result_card(res)
else:
st.error(f"API error {r.status_code}: {r.text}")
except Exception as e:
st.error(f"Request failed: {e}")
# ══════════════════════════════════════════════════════════════════════════════
# TAB 2 — INDEX SINGLE FILE
# ══════════════════════════════════════════════════════════════════════════════
with t_index:
st.markdown("#### Index a single source file")
uploaded = st.file_uploader(
"Upload file",
type=["py","js","ts","tsx","jsx","go","rs","java","cpp","c","cs","rb","php","md","txt"],
help="Supported: Python, JS, TS, Go, Rust, Java, C/C++, C#, Ruby, PHP, Markdown, text",
)
col1, col2 = st.columns(2)
with col1:
custom_id = st.text_input(
"Custom doc_id (optional)",
placeholder="Leave blank to use filename",
)
if uploaded:
st.markdown(
f'{uploaded.name}'
f'{uploaded.size:,} bytes
',
unsafe_allow_html=True,
)
if st.button("Index File →", use_container_width=True):
if not uploaded:
st.warning("Upload a file first.")
else:
with st.spinner(f"Indexing `{uploaded.name}`…"):
try:
files = {"file": (uploaded.name, uploaded.getvalue(), "text/plain")}
data = {"doc_id": custom_id.strip()}
r = api("POST", "/index", files=files, data=data, timeout=120)
if r.ok:
d = r.json()
c1, c2, c3 = st.columns(3)
c1.metric("doc_id", d["doc_id"])
c2.metric("Chunks", d["chunks_indexed"])
c3.metric("Status", "✓ indexed")
st.success(d["message"])
else:
st.error(f"API error {r.status_code}: {r.text}")
except Exception as e:
st.error(f"Request failed: {e}")
# ══════════════════════════════════════════════════════════════════════════════
# TAB 3 — BATCH INDEX
# ══════════════════════════════════════════════════════════════════════════════
with t_batch:
st.markdown("#### Batch index an entire project")
st.markdown(
''
"Upload multiple files — they will all be indexed under one shared doc_id.",
unsafe_allow_html=True,
)
batch_files = st.file_uploader(
"Upload files",
accept_multiple_files=True,
type=["py","js","ts","tsx","jsx","go","rs","java","cpp","c","cs","rb","php","md","txt"],
key="batch_uploader",
)
col1, col2 = st.columns([2, 1])
with col1:
batch_id = st.text_input("Project doc_id", placeholder="my_project", key="batch_id")
with col2:
replace = st.checkbox("Replace existing index", value=True)
if batch_files:
st.markdown(f"**{len(batch_files)} file(s) queued:**")
for f in batch_files:
st.markdown(
f''
f'{f.name}'
f'{f.size:,} bytes'
f'
',
unsafe_allow_html=True,
)
if st.button("Batch Index →", use_container_width=True, key="batch_btn"):
if not batch_files:
st.warning("Upload at least one file.")
elif not batch_id.strip():
st.warning("Enter a project doc_id.")
else:
payload = {
"doc_id": batch_id.strip(),
"replace": replace,
"files": [
{"filename": f.name, "content": f.getvalue().decode("utf-8", errors="replace")}
for f in batch_files
],
}
with st.spinner(f"Indexing {len(batch_files)} files into `{batch_id}`…"):
try:
r = api("POST", "/index/batch", json=payload, timeout=300)
if r.ok:
d = r.json()
c1, c2, c3 = st.columns(3)
c1.metric("doc_id", d["doc_id"])
c2.metric("Files indexed", d["files_indexed"])
c3.metric("Chunks indexed", d["chunks_indexed"])
st.success("Batch index complete!")
else:
st.error(f"API error {r.status_code}: {r.text}")
except Exception as e:
st.error(f"Request failed: {e}")
# ══════════════════════════════════════════════════════════════════════════════
# TAB 4 — DOCUMENTS
# ══════════════════════════════════════════════════════════════════════════════
with t_docs:
st.markdown("#### Indexed documents")
col_refresh, _ = st.columns([1, 4])
with col_refresh:
refresh = st.button("↻ Refresh", key="refresh_docs")
try:
r = api("GET", "/documents", timeout=10)
if r.ok:
docs = r.json().get("documents", [])
if not docs:
st.info("No documents indexed yet. Use the Index or Batch Index tabs.")
else:
total_chunks = sum(d["chunks"] for d in docs)
m1, m2 = st.columns(2)
m1.metric("Documents", len(docs))
m2.metric("Total chunks", total_chunks)
st.markdown("---")
for doc in docs:
col_info, col_del = st.columns([5, 1])
with col_info:
st.markdown(
f''
f'📁 {doc["doc_id"]}'
f'{doc["chunks"]:,} chunks'
f'
',
unsafe_allow_html=True,
)
with col_del:
if st.button("Delete", key=f"del_{doc['doc_id']}", type="secondary"):
try:
dr = api("DELETE", f"/documents/{doc['doc_id']}", timeout=10)
if dr.ok:
st.success(f"Deleted `{doc['doc_id']}`")
st.rerun()
else:
st.error(dr.text)
except Exception as e:
st.error(str(e))
else:
st.error(f"API error {r.status_code}: {r.text}")
except Exception as e:
st.error(f"Could not reach API: {e}")
# ══════════════════════════════════════════════════════════════════════════════
# TAB 5 — EMBED
# ══════════════════════════════════════════════════════════════════════════════
with t_embed:
st.markdown("#### Embed arbitrary texts")
st.markdown(
''
"Returns raw 768-dim float vectors. One text per line.",
unsafe_allow_html=True,
)
raw_texts = st.text_area(
"Texts (one per line)",
placeholder="def getUserById(id):\n return db.query(User).filter(User.id == id).first()\nfetch user from database",
height=160,
)
if st.button("Embed →", use_container_width=True):
texts = [t.strip() for t in raw_texts.strip().splitlines() if t.strip()]
if not texts:
st.warning("Enter at least one text.")
elif len(texts) > 64:
st.warning("Maximum 64 texts per request.")
else:
with st.spinner(f"Embedding {len(texts)} text(s)…"):
try:
r = api("POST", "/embed", json={"texts": texts}, timeout=60)
if r.ok:
d = r.json()
embs = d["embeddings"]
st.metric("Dimensions", d["dimensions"])
st.markdown(f"**{len(embs)} embedding(s) returned**")
st.markdown("---")
for i, (txt, vec) in enumerate(zip(texts, embs)):
with st.expander(f"[{i}] `{txt[:60]}{'…' if len(txt)>60 else ''}`"):
preview = vec[:16]
st.markdown(
f''
f'[{", ".join(f"{v:.5f}" for v in preview)}, …]
'
f'dim 768 · showing first 16'
f'',
unsafe_allow_html=True,
)
if st.toggle("Show full vector", key=f"full_{i}"):
st.json(vec)
else:
st.error(f"API error {r.status_code}: {r.text}")
except Exception as e:
st.error(f"Request failed: {e}")