Spaces:
Sleeping
Sleeping
File size: 3,902 Bytes
20e5327 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
import subprocess
import time
# start backend
subprocess.Popen(
["uvicorn", "backend.main:app", "--host", "0.0.0.0", "--port", "8000"]
)
# small delay so backend boots
time.sleep(2)
import streamlit as st
import requests
import time
API_URL = "http://0.0.0.0:8000"
st.set_page_config(page_title="GenAI RAG App (Docs + Code)", layout="wide")
st.title("🧠 GenAI RAG App (Docs + Code)")
# ---------------- Chat Memory ----------------
if "history" not in st.session_state:
st.session_state["history"] = []
# ---------------- Sidebar Settings ----------------
with st.sidebar:
st.header("Settings")
domain = st.radio("Knowledge Base:", ["doc", "code"])
if st.button("Clear Chat"):
st.session_state["history"] = []
# ---------------- Tabs ----------------
upload_tab, query_tab = st.tabs(["⬆ Upload", "💬 Chat"])
# ---------------- Upload Tab ----------------
with upload_tab:
st.subheader("Upload File")
file = st.file_uploader("Upload PDF / TXT / Python code", type=["pdf", "txt", "md", "py", "js", "ts"])
if file:
ext = file.name.split(".")[-1]
if ext in ["pdf", "txt", "md"]:
endpoint = "/upload/doc"
else:
endpoint = "/upload/code"
files = {"file": (file.name, file.getvalue())}
with st.spinner("Indexing file..."):
r = requests.post(API_URL + endpoint, files=files)
if r.status_code == 200:
st.success("File Indexed!")
st.json(r.json())
else:
st.error("Upload Failed! Check FastAPI backend.")
# ---------------- Helper: Streaming Output ----------------
def stream_text(text, delay=0.025):
"""Yield tokens to simulate streaming."""
for token in text.split():
yield token + " "
time.sleep(delay)
# ---------------- Query / Chat Tab ----------------
with query_tab:
st.subheader("Ask a Question")
# Show chat history
for msg in st.session_state["history"]:
role = "You" if msg["role"] == "user" else "AI"
st.markdown(f"**{role}:** {msg['text']}")
# Mode selection based on domain
if domain == "code":
mode = st.selectbox("Mode:", [
"Explain",
"Debug",
"Fix",
"Refactor",
"Add Comments",
"Complexity"
])
else:
mode = st.selectbox("Mode:", [
"Answer",
"Summarize",
"Bullet Points",
"Keywords"
])
question = st.text_input("Enter your question")
if st.button("Ask"):
if not question.strip():
st.warning("Enter a question before submitting.")
else:
payload = {"question": question, "domain": domain, "mode": mode}
with st.spinner("Thinking..."):
r = requests.post(API_URL + "/query", json=payload)
if r.status_code == 200:
data = r.json()
answer = data["answer"]
context = data["context"]
# Save history
st.session_state["history"].append({"role": "user", "text": question})
st.session_state["history"].append({"role": "assistant", "text": answer})
# Stream output
st.success("AI:")
placeholder = st.empty()
stream = stream_text(answer)
full = ""
for token in stream:
full += token
placeholder.markdown(full + "▌")
# RAG Context section
with st.expander("🔍 Retrieved Context (RAG)"):
if domain == "code":
st.code(context)
else:
st.text(context)
else:
st.error("Query Failed! Check backend or logs.")
|