import streamlit as st import requests import json import os from pathlib import Path # ─────────────────────────── Page config ────────────────────────────────────── st.set_page_config( page_title="Code Search", page_icon="⌕", layout="wide", initial_sidebar_state="expanded", ) # ─────────────────────────── Custom CSS ─────────────────────────────────────── st.markdown(""" """, unsafe_allow_html=True) # ─────────────────────────── Helpers ────────────────────────────────────────── def api(method: str, path: str, **kwargs) -> requests.Response: base = st.session_state.get("api_url", "").rstrip("/") timeout = kwargs.pop("timeout", 60) return requests.request(method, f"{base}{path}", timeout=timeout, **kwargs) def health_check(url: str) -> bool: try: r = requests.get(url.rstrip("/") + "/health", timeout=5) return r.ok and r.json().get("models_loaded", False) except Exception: return False def render_result_card(r: dict): st.markdown(f"""
#{r['rank']} score {r['score']:.4f}
{r['text']}
""", unsafe_allow_html=True) # ─────────────────────────── Sidebar ────────────────────────────────────────── with st.sidebar: st.markdown("### ⌕ Code Search") st.markdown("---") api_url = st.text_input( "API URL", value=st.session_state.get("api_url", os.getenv("API_URL", "")), placeholder="https://your-space.hf.space", help="Your Code Search API HuggingFace Space URL", ) st.session_state["api_url"] = api_url if api_url: is_up = health_check(api_url) dot = "online" if is_up else "offline" label = "API online · models loaded" if is_up else "API offline or unreachable" st.markdown( f'' f'{label}', unsafe_allow_html=True, ) st.markdown("---") st.markdown( '' "jina-embeddings-v2-base-code
FAISS · AST chunking · /data persistence" "
", unsafe_allow_html=True, ) # ─────────────────────────── Header ─────────────────────────────────────────── st.markdown(""" """, unsafe_allow_html=True) if not api_url: st.info("👈 Enter your API URL in the sidebar to get started.") st.stop() # ─────────────────────────── Tabs ───────────────────────────────────────────── t_search, t_index, t_batch, t_docs, t_embed = st.tabs([ "🔍 Search", "📄 Index File", "📦 Batch Index", "🗂 Documents", "🧮 Embed", ]) # ══════════════════════════════════════════════════════════════════════════════ # TAB 1 — SEARCH # ══════════════════════════════════════════════════════════════════════════════ with t_search: st.markdown("#### Search an indexed codebase") # Fetch doc list for autocomplete doc_ids = [] try: docs_resp = api("GET", "/documents", timeout=10) if docs_resp.ok: doc_ids = [d["doc_id"] for d in docs_resp.json().get("documents", [])] except Exception: pass col1, col2 = st.columns([2, 1]) with col1: if doc_ids: doc_id = st.selectbox("Document / Project ID", options=doc_ids) else: doc_id = st.text_input("Document / Project ID", placeholder="my_project") with col2: top_k = st.slider("Top K results", min_value=1, max_value=20, value=5) query = st.text_area( "Query", placeholder="e.g. fetch user from database\nor: async function that handles authentication", height=80, ) if st.button("Search ⌕", use_container_width=True): if not query.strip(): st.warning("Enter a query.") elif not doc_id: st.warning("Enter a document ID.") else: with st.spinner("Searching…"): try: r = api("POST", "/search", json={ "doc_id": doc_id, "query": query.strip(), "top_k": top_k, }) if r.ok: data = r.json() results = data.get("results", []) st.markdown(f"**{len(results)} results** for `{query[:60]}`") st.markdown("---") for res in results: render_result_card(res) else: st.error(f"API error {r.status_code}: {r.text}") except Exception as e: st.error(f"Request failed: {e}") # ══════════════════════════════════════════════════════════════════════════════ # TAB 2 — INDEX SINGLE FILE # ══════════════════════════════════════════════════════════════════════════════ with t_index: st.markdown("#### Index a single source file") uploaded = st.file_uploader( "Upload file", type=["py","js","ts","tsx","jsx","go","rs","java","cpp","c","cs","rb","php","md","txt"], help="Supported: Python, JS, TS, Go, Rust, Java, C/C++, C#, Ruby, PHP, Markdown, text", ) col1, col2 = st.columns(2) with col1: custom_id = st.text_input( "Custom doc_id (optional)", placeholder="Leave blank to use filename", ) if uploaded: st.markdown( f'
{uploaded.name}' f'{uploaded.size:,} bytes
', unsafe_allow_html=True, ) if st.button("Index File →", use_container_width=True): if not uploaded: st.warning("Upload a file first.") else: with st.spinner(f"Indexing `{uploaded.name}`…"): try: files = {"file": (uploaded.name, uploaded.getvalue(), "text/plain")} data = {"doc_id": custom_id.strip()} r = api("POST", "/index", files=files, data=data, timeout=120) if r.ok: d = r.json() c1, c2, c3 = st.columns(3) c1.metric("doc_id", d["doc_id"]) c2.metric("Chunks", d["chunks_indexed"]) c3.metric("Status", "✓ indexed") st.success(d["message"]) else: st.error(f"API error {r.status_code}: {r.text}") except Exception as e: st.error(f"Request failed: {e}") # ══════════════════════════════════════════════════════════════════════════════ # TAB 3 — BATCH INDEX # ══════════════════════════════════════════════════════════════════════════════ with t_batch: st.markdown("#### Batch index an entire project") st.markdown( '' "Upload multiple files — they will all be indexed under one shared doc_id.", unsafe_allow_html=True, ) batch_files = st.file_uploader( "Upload files", accept_multiple_files=True, type=["py","js","ts","tsx","jsx","go","rs","java","cpp","c","cs","rb","php","md","txt"], key="batch_uploader", ) col1, col2 = st.columns([2, 1]) with col1: batch_id = st.text_input("Project doc_id", placeholder="my_project", key="batch_id") with col2: replace = st.checkbox("Replace existing index", value=True) if batch_files: st.markdown(f"**{len(batch_files)} file(s) queued:**") for f in batch_files: st.markdown( f'
' f'{f.name}' f'{f.size:,} bytes' f'
', unsafe_allow_html=True, ) if st.button("Batch Index →", use_container_width=True, key="batch_btn"): if not batch_files: st.warning("Upload at least one file.") elif not batch_id.strip(): st.warning("Enter a project doc_id.") else: payload = { "doc_id": batch_id.strip(), "replace": replace, "files": [ {"filename": f.name, "content": f.getvalue().decode("utf-8", errors="replace")} for f in batch_files ], } with st.spinner(f"Indexing {len(batch_files)} files into `{batch_id}`…"): try: r = api("POST", "/index/batch", json=payload, timeout=300) if r.ok: d = r.json() c1, c2, c3 = st.columns(3) c1.metric("doc_id", d["doc_id"]) c2.metric("Files indexed", d["files_indexed"]) c3.metric("Chunks indexed", d["chunks_indexed"]) st.success("Batch index complete!") else: st.error(f"API error {r.status_code}: {r.text}") except Exception as e: st.error(f"Request failed: {e}") # ══════════════════════════════════════════════════════════════════════════════ # TAB 4 — DOCUMENTS # ══════════════════════════════════════════════════════════════════════════════ with t_docs: st.markdown("#### Indexed documents") col_refresh, _ = st.columns([1, 4]) with col_refresh: refresh = st.button("↻ Refresh", key="refresh_docs") try: r = api("GET", "/documents", timeout=10) if r.ok: docs = r.json().get("documents", []) if not docs: st.info("No documents indexed yet. Use the Index or Batch Index tabs.") else: total_chunks = sum(d["chunks"] for d in docs) m1, m2 = st.columns(2) m1.metric("Documents", len(docs)) m2.metric("Total chunks", total_chunks) st.markdown("---") for doc in docs: col_info, col_del = st.columns([5, 1]) with col_info: st.markdown( f'
' f'📁 {doc["doc_id"]}' f'{doc["chunks"]:,} chunks' f'
', unsafe_allow_html=True, ) with col_del: if st.button("Delete", key=f"del_{doc['doc_id']}", type="secondary"): try: dr = api("DELETE", f"/documents/{doc['doc_id']}", timeout=10) if dr.ok: st.success(f"Deleted `{doc['doc_id']}`") st.rerun() else: st.error(dr.text) except Exception as e: st.error(str(e)) else: st.error(f"API error {r.status_code}: {r.text}") except Exception as e: st.error(f"Could not reach API: {e}") # ══════════════════════════════════════════════════════════════════════════════ # TAB 5 — EMBED # ══════════════════════════════════════════════════════════════════════════════ with t_embed: st.markdown("#### Embed arbitrary texts") st.markdown( '' "Returns raw 768-dim float vectors. One text per line.", unsafe_allow_html=True, ) raw_texts = st.text_area( "Texts (one per line)", placeholder="def getUserById(id):\n return db.query(User).filter(User.id == id).first()\nfetch user from database", height=160, ) if st.button("Embed →", use_container_width=True): texts = [t.strip() for t in raw_texts.strip().splitlines() if t.strip()] if not texts: st.warning("Enter at least one text.") elif len(texts) > 64: st.warning("Maximum 64 texts per request.") else: with st.spinner(f"Embedding {len(texts)} text(s)…"): try: r = api("POST", "/embed", json={"texts": texts}, timeout=60) if r.ok: d = r.json() embs = d["embeddings"] st.metric("Dimensions", d["dimensions"]) st.markdown(f"**{len(embs)} embedding(s) returned**") st.markdown("---") for i, (txt, vec) in enumerate(zip(texts, embs)): with st.expander(f"[{i}] `{txt[:60]}{'…' if len(txt)>60 else ''}`"): preview = vec[:16] st.markdown( f'' f'[{", ".join(f"{v:.5f}" for v in preview)}, …]
' f'dim 768 · showing first 16' f'
', unsafe_allow_html=True, ) if st.toggle("Show full vector", key=f"full_{i}"): st.json(vec) else: st.error(f"API error {r.status_code}: {r.text}") except Exception as e: st.error(f"Request failed: {e}")