File size: 4,297 Bytes
8e72e1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import os
import hashlib
import streamlit as st

st.set_page_config(page_title="Codebase Intelligence Agent", page_icon="🧭", layout="wide")

# Bridge HF/Streamlit secrets to env (safe if neither exists).
try:
    if not os.getenv("OPENAI_API_KEY") and "OPENAI_API_KEY" in st.secrets:
        os.environ["OPENAI_API_KEY"] = st.secrets["OPENAI_API_KEY"]
except Exception:
    pass

st.title("🧭 Codebase Intelligence Agent")
st.write("Upload a Python repository (ZIP) and ask questions — answers come with exact file/line citations.")


# ---- models: loaded once per session ----
@st.cache_resource(show_spinner="Loading models (first run only)...")
def load_models():
    from src.rag.embedder import Embedder
    from src.rag.reranker import Reranker
    from src.rag.answerer import Answerer
    return {"embedder": Embedder(), "reranker": Reranker(), "answerer": Answerer()}


# ---- ingest + chunk, cached per uploaded file ----
@st.cache_data(show_spinner="Scanning and parsing repository...")
def ingest(file_bytes, file_name):
    import tempfile
    from src.ingestion.scanner import scan_repo
    from src.ingestion.chunker import chunk_repo

    tmp_zip = os.path.join(tempfile.mkdtemp(), file_name)
    with open(tmp_zip, "wb") as f:
        f.write(file_bytes)
    files, _ = scan_repo(tmp_zip)
    chunks = chunk_repo(files)
    return chunks, len(files)


# ---- build index, cached per file (underscore args skip hashing) ----
@st.cache_resource(show_spinner="Building search index...")
def build_index(file_hash, _chunks, _embeddings):
    from src.rag.vector_store import VectorStore
    from src.rag.bm25_search import BM25Retriever
    from src.rag.hybrid_search import HybridRetriever
    vs = VectorStore()
    vs.build(_embeddings, _chunks)
    return HybridRetriever(vs, BM25Retriever(_chunks))


@st.cache_data(show_spinner="Embedding code...")
def embed(file_hash, _texts, _embedder):
    return _embedder.create_embeddings(_texts)


uploaded = st.file_uploader("Upload repository ZIP", type=["zip"])

if uploaded:
    file_bytes = uploaded.getvalue()
    file_hash = hashlib.md5(file_bytes).hexdigest()

    models = load_models()
    chunks, n_files = ingest(file_bytes, uploaded.name)
    texts = [c["chunk_text"] for c in chunks]
    embeddings = embed(file_hash, texts, models["embedder"])
    hybrid = build_index(file_hash, chunks, embeddings)

    st.success(f"Indexed {len(chunks)} definitions from {n_files} files. Ask away.")

    ask_tab, test_tab = st.tabs(["💬 Ask the codebase", "🧪 Generate tests"])

    with ask_tab:
        query = st.text_input("Ask about the codebase", placeholder="e.g. where are JWT tokens created?")
        if query:
            with st.spinner("Searching and answering..."):
                results = hybrid.search(query, models["embedder"].create_embeddings([query])[0], k=10)
                results = models["reranker"].rerank(query, results)
                result = models["answerer"].answer(query, results[:5])

            st.subheader("Answer")
            st.write(result["answer"])

            st.subheader("Sources")
            for s in result["sources"]:
                label = f"📄 {s['file']}:{s['start_line']}-{s['end_line']}  ·  {s['type']} {s['name']}"
                with st.expander(label):
                    st.code(s["code"], language="python")

    with test_tab:
        st.write("Enter a function or class name and the agent will read its real source and write pytest tests.")
        target = st.text_input("Function / class name", placeholder="e.g. create_access_token")
        if target and st.button("Generate tests"):
            from src.agent.tools import CodeTools
            from src.agent.workflow import TestAgent
            with st.spinner("Agent reading the code and writing tests..."):
                tools = CodeTools(chunks, models["embedder"], hybrid, models["reranker"])
                tests = TestAgent(tools).generate_tests(target)
            st.code(tests.replace("```python", "").replace("```", "").strip(), language="python")

    with st.expander("Repository overview"):
        by_type = {}
        for c in chunks:
            by_type[c["type"]] = by_type.get(c["type"], 0) + 1
        st.write({"files": n_files, "definitions": len(chunks), **by_type})