Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os | |
| import numpy as np | |
| # NumPy 2.0 Compatibility Shim for older versions of LangChain/HuggingFace | |
| if not hasattr(np, "float_"): | |
| np.float_ = np.float64 | |
| if not hasattr(np, "bool_"): | |
| np.bool_ = np.bool8 | |
| from rag_pipeline import ( | |
| load_repo, create_vectorstore, create_qa_chain, query, | |
| is_repo_indexed, get_existing_vectorstore, get_cached_meta | |
| ) | |
| from agent import run_agent | |
| # ---------- Page Config ---------- | |
| st.set_page_config( | |
| page_title="CodeLens | Intelligent Code Analytics", | |
| page_icon="๐", | |
| layout="wide", | |
| initial_sidebar_state="expanded", | |
| ) | |
| # ---------- Basic Styling ---------- | |
| st.markdown(""" | |
| <style> | |
| .stButton>button { | |
| background-color: #f0f2f6; | |
| color: #31333f; | |
| border: 1px solid #d1d5db; | |
| } | |
| .stButton>button:hover { | |
| border-color: #ff4b4b; | |
| color: #ff4b4b; | |
| } | |
| .answer-box { | |
| padding: 24px; | |
| border-radius: 12px; | |
| border-left: 6px solid #ff4b4b; | |
| background-color: rgba(128, 128, 128, 0.05); | |
| margin-bottom: 24px; | |
| font-family: 'Inter', sans-serif; | |
| line-height: 1.6; | |
| } | |
| .search-tag { | |
| display: inline-block; | |
| padding: 3px 10px; | |
| border-radius: 6px; | |
| font-size: 0.7rem; | |
| font-weight: 700; | |
| margin-left: 10px; | |
| text-transform: uppercase; | |
| letter-spacing: 0.5px; | |
| } | |
| .tag-semantic { | |
| background-color: #3b82f633; | |
| color: #60a5fa; | |
| border: 1px solid #3b82f644; | |
| } | |
| .tag-keyword { | |
| background-color: #f59e0b33; | |
| color: #fbbf24; | |
| border: 1px solid #f59e0b44; | |
| } | |
| .source-card { | |
| padding: 15px; | |
| border-radius: 8px; | |
| background-color: rgba(128, 128, 128, 0.03); | |
| border: 1px solid rgba(128, 128, 128, 0.1); | |
| margin-bottom: 10px; | |
| } | |
| /* Hide the Streamlit 'No secrets found' warning at the top if it exists */ | |
| .stAlert { margin-top: -10px; } | |
| header[data-testid="stHeader"] { background: transparent !important; } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # ---------- Sidebar ---------- | |
| st.sidebar.title("๐ CodeLens Setup") | |
| st.sidebar.divider() | |
| repo_url = st.sidebar.text_input( | |
| "๐ GitHub Repository URL", | |
| placeholder="https://github.com/user/repo" | |
| ) | |
| # Auto-fix missing https:// | |
| if repo_url and not repo_url.startswith("http"): | |
| repo_url = "https://" + repo_url | |
| # Read API key from secrets or Environment Variables (for Docker/HF) | |
| _api_key = "" | |
| try: | |
| _api_key = st.secrets.get("OPENAI_API_KEY", "") | |
| except Exception: | |
| pass | |
| # Fallback to Environment Variable (Standard for HF Secrets) | |
| if not _api_key: | |
| _api_key = os.getenv("OPENAI_API_KEY", "") | |
| if _api_key: | |
| st.sidebar.success("๐ API Key loaded securely.") | |
| openai_api_key = _api_key | |
| else: | |
| openai_api_key = st.sidebar.text_input("๐ API Key", type="password") | |
| if not openai_api_key: | |
| st.sidebar.warning("โ ๏ธ Enter key above or add OPENAI_API_KEY secret.") | |
| st.sidebar.divider() | |
| # Model Selection | |
| selected_model_label = st.sidebar.radio( | |
| "๐ง Embedding Engine", | |
| options=["Faster (MiniLM)", "Better (E5-Small)"], | |
| index=1, | |
| help="MiniLM is 3x faster for indexing. E5-Small provides better semantic accuracy." | |
| ) | |
| from rag_pipeline import EMBEDDING_MODELS | |
| selected_model = EMBEDDING_MODELS[selected_model_label] | |
| st.sidebar.divider() | |
| # Force re-index button | |
| if st.sidebar.button("๐ Force Re-index"): | |
| st.cache_resource.clear() | |
| import shutil | |
| from rag_pipeline import DB_ROOT, handle_remove_readonly | |
| if os.path.exists(DB_ROOT): | |
| shutil.rmtree(DB_ROOT, onerror=handle_remove_readonly) | |
| st.sidebar.success("Cache cleared! Enter a URL to re-index.") | |
| st.rerun() | |
| st.sidebar.divider() | |
| with st.sidebar.expander("๐๏ธ How It Works"): | |
| st.write("**Hybrid Search Engine:**") | |
| st.write("1. ๐ฅ **Clone** โ Repo cloned via Git") | |
| st.write("2. โ๏ธ **Chunk** โ Smart code splitting") | |
| st.write("3. ๐งฎ **Embed** โ `e5-small-v2` vectors") | |
| st.write("4. ๐พ **Store** โ ChromaDB persistence") | |
| st.write("5. ๐ **Semantic** โ Embedding similarity") | |
| st.write("6. ๐ **Keyword** โ Grep-style matching") | |
| st.write("7. ๐ **Merge** โ Best of both results") | |
| st.write("8. ๐ค **Answer** โ Agentic Thinking Loop") | |
| st.title("๐ CodeLens - Intelligent Code Analytics") | |
| st.write("Understand any GitHub repository instantly with **Hybrid Search** and **LLM Re-ranking**.") | |
| # ---------- Initialize RAG ---------- | |
| def initialize_rag(url, api_key, model_name): | |
| if not url or not api_key: | |
| return None, None, 0, "" | |
| try: | |
| if is_repo_indexed(url, model_name): | |
| with st.status(f"๐ Loading {model_name.split('/')[-1]} index...", expanded=False) as status: | |
| vectorstore = get_existing_vectorstore(url, model_name) | |
| qa_chain = create_qa_chain(vectorstore, api_key) | |
| meta = get_cached_meta(url, model_name) | |
| num_chunks = meta.get("num_chunks", 0) | |
| # Derive repo_path from URL | |
| repo_name = url.split("/")[-1].replace(".git", "") | |
| repo_path = f"./.{repo_name}" | |
| status.update(label=f"โจ Ready ({num_chunks} chunks)!", state="complete") | |
| return vectorstore, qa_chain, num_chunks, repo_path | |
| with st.status(f"๐ฅ Indexing with {model_name.split('/')[-1]}...", expanded=True) as status: | |
| documents, repo_path = load_repo(url) | |
| num_chunks = len(documents) | |
| status.update(label=f"โ Loaded {num_chunks} chunks. Embedding...", state="running") | |
| vectorstore = create_vectorstore(documents, repo_url=url, model_name=model_name) | |
| qa_chain = create_qa_chain(vectorstore, api_key) | |
| status.update(label="โจ System Ready!", state="complete", expanded=False) | |
| return vectorstore, qa_chain, num_chunks, repo_path | |
| except Exception as e: | |
| st.error(f"โ Error: {e}") | |
| return None, None, 0, "" | |
| if repo_url and openai_api_key: | |
| vectorstore, qa_chain, num_chunks, repo_path = initialize_rag(repo_url, openai_api_key, selected_model) | |
| if num_chunks > 0: | |
| st.success(f"โ Indexed **{num_chunks}** chunks from **{repo_url.split('/')[-1]}** ยท Hybrid search active ๐") | |
| else: | |
| st.info("๐ Enter a **GitHub URL** in the sidebar to begin.") | |
| vectorstore, qa_chain, num_chunks, repo_path = None, None, 0, "" | |
| # ---------- Search ---------- | |
| question = st.text_input("๐ฌ Ask a question about the code:") | |
| col1, col2, col3 = st.columns([1.5, 2, 3]) | |
| with col1: | |
| search_clicked = st.button("๐ Quick Search") | |
| with col2: | |
| agent_clicked = st.button("๐ง Agentic Search", use_container_width=True) | |
| if search_clicked or agent_clicked: | |
| if not qa_chain: | |
| st.error("RAG system not initialized.") | |
| elif not question: | |
| st.warning("Please enter a question.") | |
| else: | |
| if agent_clicked: | |
| # --- Agentic Loop --- | |
| # Run the agent inside st.status, then render expanders OUTSIDE to avoid nesting error | |
| _agent_result = {} | |
| with st.status("๐ง Agentic Brain Working...", expanded=True) as status: | |
| try: | |
| status.update(label="๐ Planning investigation...", state="running") | |
| _answer, _plan, _actions_log, _sources = run_agent(question, vectorstore, repo_path, openai_api_key) | |
| _agent_result = {"answer": _answer, "plan": _plan, "actions_log": _actions_log, "sources": _sources} | |
| status.update(label="โ Investigation Complete", state="complete", expanded=False) | |
| except Exception as e: | |
| status.update(label="โ Agent Failed", state="error") | |
| st.error(f"โ An error occurred in agent loop: {e}") | |
| # Render results OUTSIDE st.status (avoids nested expander Streamlit bug) | |
| if _agent_result: | |
| with st.expander("๐ Investigation Plan", expanded=True): | |
| for i, step in enumerate(_agent_result["plan"]): | |
| st.write(f"{i+1}. {step}") | |
| with st.expander("๐ง Tool Actions"): | |
| for i, action in enumerate(_agent_result["actions_log"]): | |
| st.write(f"**Step {i+1}: {action['step']}**") | |
| st.markdown(f"- **Tool:** `{action['tool']}`") | |
| st.markdown(f"- **Reason:** {action['reason']}") | |
| st.markdown("**๐๏ธ Observation:**") | |
| st.text(action['observation']) | |
| st.divider() | |
| st.subheader("โ Final Agent Answer") | |
| st.markdown(f'<div class="answer-box">{_agent_result["answer"]}</div>', unsafe_allow_html=True) | |
| # Show sources accessed during investigation | |
| if _agent_result["sources"]: | |
| st.divider() | |
| st.subheader("๐ Sources Accessed") | |
| for src in sorted(_agent_result["sources"]): | |
| st.markdown(f"- `{src}`") | |
| else: | |
| # --- Traditional Quick Search --- | |
| with st.spinner("๐ง Searching (semantic + keyword)..."): | |
| try: | |
| answer, sources = query(qa_chain, question, vectorstore=vectorstore, repo_path=repo_path) | |
| st.subheader("๐ก AI Answer") | |
| st.markdown(f'<div class="answer-box">{answer}</div>', unsafe_allow_html=True) | |
| st.divider() | |
| st.subheader(f"๐ Top {len(sources)} Sources") | |
| for i, doc in enumerate(sources): | |
| path = doc.metadata.get('file_path', 'Unknown') | |
| search_type = doc.metadata.get('search_type', 'semantic') | |
| tag_class = "tag-semantic" if search_type == "semantic" else "tag-keyword" | |
| tag_label = "๐งฎ Semantic" if search_type == "semantic" else "๐ Keyword" | |
| with st.expander(f"Source {i+1}: {path}", expanded=(i == 0)): | |
| st.markdown( | |
| f'<span class="search-tag {tag_class}">{tag_label}</span>', | |
| unsafe_allow_html=True | |
| ) | |
| st.code(doc.page_content) | |
| except Exception as e: | |
| st.error(f"โ An error occurred: {e}") | |
| st.divider() | |