import streamlit as st import os import numpy as np # NumPy 2.0 Compatibility Shim for older versions of LangChain/HuggingFace if not hasattr(np, "float_"): np.float_ = np.float64 if not hasattr(np, "bool_"): np.bool_ = np.bool8 from rag_pipeline import ( load_repo, create_vectorstore, create_qa_chain, query, is_repo_indexed, get_existing_vectorstore, get_cached_meta ) from agent import run_agent # ---------- Page Config ---------- st.set_page_config( page_title="CodeLens | Intelligent Code Analytics", page_icon="๐Ÿ”", layout="wide", initial_sidebar_state="expanded", ) # ---------- Basic Styling ---------- st.markdown(""" """, unsafe_allow_html=True) # ---------- Sidebar ---------- st.sidebar.title("๐Ÿ” CodeLens Setup") st.sidebar.divider() repo_url = st.sidebar.text_input( "๐Ÿ“‚ GitHub Repository URL", placeholder="https://github.com/user/repo" ) # Auto-fix missing https:// if repo_url and not repo_url.startswith("http"): repo_url = "https://" + repo_url # Read API key from secrets or Environment Variables (for Docker/HF) _api_key = "" try: _api_key = st.secrets.get("OPENAI_API_KEY", "") except Exception: pass # Fallback to Environment Variable (Standard for HF Secrets) if not _api_key: _api_key = os.getenv("OPENAI_API_KEY", "") if _api_key: st.sidebar.success("๐Ÿ”‘ API Key loaded securely.") openai_api_key = _api_key else: openai_api_key = st.sidebar.text_input("๐Ÿ”‘ API Key", type="password") if not openai_api_key: st.sidebar.warning("โš ๏ธ Enter key above or add OPENAI_API_KEY secret.") st.sidebar.divider() # Model Selection selected_model_label = st.sidebar.radio( "๐Ÿง  Embedding Engine", options=["Faster (MiniLM)", "Better (E5-Small)"], index=1, help="MiniLM is 3x faster for indexing. E5-Small provides better semantic accuracy." ) from rag_pipeline import EMBEDDING_MODELS selected_model = EMBEDDING_MODELS[selected_model_label] st.sidebar.divider() # Force re-index button if st.sidebar.button("๐Ÿ”„ Force Re-index"): st.cache_resource.clear() import shutil from rag_pipeline import DB_ROOT, handle_remove_readonly if os.path.exists(DB_ROOT): shutil.rmtree(DB_ROOT, onerror=handle_remove_readonly) st.sidebar.success("Cache cleared! Enter a URL to re-index.") st.rerun() st.sidebar.divider() with st.sidebar.expander("๐Ÿ—๏ธ How It Works"): st.write("**Hybrid Search Engine:**") st.write("1. ๐Ÿ“ฅ **Clone** โ†’ Repo cloned via Git") st.write("2. โœ‚๏ธ **Chunk** โ†’ Smart code splitting") st.write("3. ๐Ÿงฎ **Embed** โ†’ `e5-small-v2` vectors") st.write("4. ๐Ÿ’พ **Store** โ†’ ChromaDB persistence") st.write("5. ๐Ÿ” **Semantic** โ†’ Embedding similarity") st.write("6. ๐Ÿ“ **Keyword** โ†’ Grep-style matching") st.write("7. ๐Ÿ”€ **Merge** โ†’ Best of both results") st.write("8. ๐Ÿค– **Answer** โ†’ Agentic Thinking Loop") st.title("๐Ÿ” CodeLens - Intelligent Code Analytics") st.write("Understand any GitHub repository instantly with **Hybrid Search** and **LLM Re-ranking**.") # ---------- Initialize RAG ---------- @st.cache_resource(show_spinner=False) def initialize_rag(url, api_key, model_name): if not url or not api_key: return None, None, 0, "" try: if is_repo_indexed(url, model_name): with st.status(f"๐Ÿ” Loading {model_name.split('/')[-1]} index...", expanded=False) as status: vectorstore = get_existing_vectorstore(url, model_name) qa_chain = create_qa_chain(vectorstore, api_key) meta = get_cached_meta(url, model_name) num_chunks = meta.get("num_chunks", 0) # Derive repo_path from URL repo_name = url.split("/")[-1].replace(".git", "") repo_path = f"./.{repo_name}" status.update(label=f"โœจ Ready ({num_chunks} chunks)!", state="complete") return vectorstore, qa_chain, num_chunks, repo_path with st.status(f"๐Ÿ“ฅ Indexing with {model_name.split('/')[-1]}...", expanded=True) as status: documents, repo_path = load_repo(url) num_chunks = len(documents) status.update(label=f"โœ… Loaded {num_chunks} chunks. Embedding...", state="running") vectorstore = create_vectorstore(documents, repo_url=url, model_name=model_name) qa_chain = create_qa_chain(vectorstore, api_key) status.update(label="โœจ System Ready!", state="complete", expanded=False) return vectorstore, qa_chain, num_chunks, repo_path except Exception as e: st.error(f"โŒ Error: {e}") return None, None, 0, "" if repo_url and openai_api_key: vectorstore, qa_chain, num_chunks, repo_path = initialize_rag(repo_url, openai_api_key, selected_model) if num_chunks > 0: st.success(f"โœ… Indexed **{num_chunks}** chunks from **{repo_url.split('/')[-1]}** ยท Hybrid search active ๐Ÿ”€") else: st.info("๐Ÿ‘ˆ Enter a **GitHub URL** in the sidebar to begin.") vectorstore, qa_chain, num_chunks, repo_path = None, None, 0, "" # ---------- Search ---------- question = st.text_input("๐Ÿ’ฌ Ask a question about the code:") col1, col2, col3 = st.columns([1.5, 2, 3]) with col1: search_clicked = st.button("๐Ÿ” Quick Search") with col2: agent_clicked = st.button("๐Ÿง  Agentic Search", use_container_width=True) if search_clicked or agent_clicked: if not qa_chain: st.error("RAG system not initialized.") elif not question: st.warning("Please enter a question.") else: if agent_clicked: # --- Agentic Loop --- # Run the agent inside st.status, then render expanders OUTSIDE to avoid nesting error _agent_result = {} with st.status("๐Ÿง  Agentic Brain Working...", expanded=True) as status: try: status.update(label="๐Ÿ“… Planning investigation...", state="running") _answer, _plan, _actions_log, _sources = run_agent(question, vectorstore, repo_path, openai_api_key) _agent_result = {"answer": _answer, "plan": _plan, "actions_log": _actions_log, "sources": _sources} status.update(label="โœ… Investigation Complete", state="complete", expanded=False) except Exception as e: status.update(label="โŒ Agent Failed", state="error") st.error(f"โŒ An error occurred in agent loop: {e}") # Render results OUTSIDE st.status (avoids nested expander Streamlit bug) if _agent_result: with st.expander("๐Ÿ“… Investigation Plan", expanded=True): for i, step in enumerate(_agent_result["plan"]): st.write(f"{i+1}. {step}") with st.expander("๐Ÿ”ง Tool Actions"): for i, action in enumerate(_agent_result["actions_log"]): st.write(f"**Step {i+1}: {action['step']}**") st.markdown(f"- **Tool:** `{action['tool']}`") st.markdown(f"- **Reason:** {action['reason']}") st.markdown("**๐Ÿ‘๏ธ Observation:**") st.text(action['observation']) st.divider() st.subheader("โœ… Final Agent Answer") st.markdown(f'
{_agent_result["answer"]}
', unsafe_allow_html=True) # Show sources accessed during investigation if _agent_result["sources"]: st.divider() st.subheader("๐Ÿ“‚ Sources Accessed") for src in sorted(_agent_result["sources"]): st.markdown(f"- `{src}`") else: # --- Traditional Quick Search --- with st.spinner("๐Ÿง  Searching (semantic + keyword)..."): try: answer, sources = query(qa_chain, question, vectorstore=vectorstore, repo_path=repo_path) st.subheader("๐Ÿ’ก AI Answer") st.markdown(f'
{answer}
', unsafe_allow_html=True) st.divider() st.subheader(f"๐Ÿ“„ Top {len(sources)} Sources") for i, doc in enumerate(sources): path = doc.metadata.get('file_path', 'Unknown') search_type = doc.metadata.get('search_type', 'semantic') tag_class = "tag-semantic" if search_type == "semantic" else "tag-keyword" tag_label = "๐Ÿงฎ Semantic" if search_type == "semantic" else "๐Ÿ“ Keyword" with st.expander(f"Source {i+1}: {path}", expanded=(i == 0)): st.markdown( f'{tag_label}', unsafe_allow_html=True ) st.code(doc.page_content) except Exception as e: st.error(f"โŒ An error occurred: {e}") st.divider()