Spaces:
Sleeping
Sleeping
| import subprocess | |
| import time | |
| # start backend | |
| subprocess.Popen( | |
| ["uvicorn", "backend.main:app", "--host", "0.0.0.0", "--port", "8000"] | |
| ) | |
| # small delay so backend boots | |
| time.sleep(2) | |
| import streamlit as st | |
| import requests | |
| import time | |
| API_URL = "http://0.0.0.0:8000" | |
| st.set_page_config(page_title="GenAI RAG App (Docs + Code)", layout="wide") | |
| st.title("🧠 GenAI RAG App (Docs + Code)") | |
| # ---------------- Chat Memory ---------------- | |
| if "history" not in st.session_state: | |
| st.session_state["history"] = [] | |
| # ---------------- Sidebar Settings ---------------- | |
| with st.sidebar: | |
| st.header("Settings") | |
| domain = st.radio("Knowledge Base:", ["doc", "code"]) | |
| if st.button("Clear Chat"): | |
| st.session_state["history"] = [] | |
| # ---------------- Tabs ---------------- | |
| upload_tab, query_tab = st.tabs(["⬆ Upload", "💬 Chat"]) | |
| # ---------------- Upload Tab ---------------- | |
| with upload_tab: | |
| st.subheader("Upload File") | |
| file = st.file_uploader("Upload PDF / TXT / Python code", type=["pdf", "txt", "md", "py", "js", "ts"]) | |
| if file: | |
| ext = file.name.split(".")[-1] | |
| if ext in ["pdf", "txt", "md"]: | |
| endpoint = "/upload/doc" | |
| else: | |
| endpoint = "/upload/code" | |
| files = {"file": (file.name, file.getvalue())} | |
| with st.spinner("Indexing file..."): | |
| r = requests.post(API_URL + endpoint, files=files) | |
| if r.status_code == 200: | |
| st.success("File Indexed!") | |
| st.json(r.json()) | |
| else: | |
| st.error("Upload Failed! Check FastAPI backend.") | |
| # ---------------- Helper: Streaming Output ---------------- | |
| def stream_text(text, delay=0.025): | |
| """Yield tokens to simulate streaming.""" | |
| for token in text.split(): | |
| yield token + " " | |
| time.sleep(delay) | |
| # ---------------- Query / Chat Tab ---------------- | |
| with query_tab: | |
| st.subheader("Ask a Question") | |
| # Show chat history | |
| for msg in st.session_state["history"]: | |
| role = "You" if msg["role"] == "user" else "AI" | |
| st.markdown(f"**{role}:** {msg['text']}") | |
| # Mode selection based on domain | |
| if domain == "code": | |
| mode = st.selectbox("Mode:", [ | |
| "Explain", | |
| "Debug", | |
| "Fix", | |
| "Refactor", | |
| "Add Comments", | |
| "Complexity" | |
| ]) | |
| else: | |
| mode = st.selectbox("Mode:", [ | |
| "Answer", | |
| "Summarize", | |
| "Bullet Points", | |
| "Keywords" | |
| ]) | |
| question = st.text_input("Enter your question") | |
| if st.button("Ask"): | |
| if not question.strip(): | |
| st.warning("Enter a question before submitting.") | |
| else: | |
| payload = {"question": question, "domain": domain, "mode": mode} | |
| with st.spinner("Thinking..."): | |
| r = requests.post(API_URL + "/query", json=payload) | |
| if r.status_code == 200: | |
| data = r.json() | |
| answer = data["answer"] | |
| context = data["context"] | |
| # Save history | |
| st.session_state["history"].append({"role": "user", "text": question}) | |
| st.session_state["history"].append({"role": "assistant", "text": answer}) | |
| # Stream output | |
| st.success("AI:") | |
| placeholder = st.empty() | |
| stream = stream_text(answer) | |
| full = "" | |
| for token in stream: | |
| full += token | |
| placeholder.markdown(full + "▌") | |
| # RAG Context section | |
| with st.expander("🔍 Retrieved Context (RAG)"): | |
| if domain == "code": | |
| st.code(context) | |
| else: | |
| st.text(context) | |
| else: | |
| st.error("Query Failed! Check backend or logs.") | |