WebRAG / streamlit_app.py
Arun21102003
Initial clean commit
97f9138
import streamlit as st
import requests
import json
import time
import os
from datetime import datetime
API_BASE_URL = os.environ.get("API_BASE_URL", "http://localhost:5000")
st.set_page_config(
page_title="RAG System - Knowledge Base",
page_icon="🧠",
layout="wide"
)
st.title("🧠 RAG System - Web Knowledge Base")
st.markdown("*Ingest web content and query it with AI-powered semantic search*")
tab1, tab2, tab3 = st.tabs(["πŸ“₯ Ingest URLs", "πŸ” Query Knowledge Base", "πŸ“Š Status Dashboard"])
with tab1:
st.header("Ingest Web Content")
st.markdown("Submit URLs to be processed and added to the knowledge base.")
url_input = st.text_input(
"Enter URL to ingest:",
placeholder="https://example.com/article",
key="url_input"
)
col1, col2 = st.columns([1, 4])
with col1:
ingest_button = st.button("πŸš€ Ingest URL", type="primary")
if ingest_button:
if not url_input:
st.error("Please enter a URL")
else:
try:
with st.spinner("Submitting URL for processing..."):
response = requests.post(
f"{API_BASE_URL}/ingest-url",
json={"url": url_input},
timeout=10
)
if response.status_code == 202:
data = response.json()
st.success(f"βœ… URL submitted successfully!")
st.info(f"**URL ID:** `{data['url_id']}`")
st.info(f"**Status:** {data['status']}")
st.markdown(data['message'])
if 'ingestion_status' not in st.session_state:
st.session_state.ingestion_status = []
st.session_state.ingestion_status.append({
'url_id': data['url_id'],
'url': data['url'],
'submitted_at': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
else:
st.error(f"Error: {response.status_code} - {response.text}")
except requests.exceptions.ConnectionError:
st.error("❌ Cannot connect to API server. Make sure FastAPI is running on port 5000.")
except Exception as e:
st.error(f"Error: {str(e)}")
if 'ingestion_status' in st.session_state and st.session_state.ingestion_status:
st.markdown("---")
st.subheader("Recently Submitted URLs")
for item in reversed(st.session_state.ingestion_status[-5:]):
with st.expander(f"πŸ”— {item['url']}"):
st.write(f"**URL ID:** `{item['url_id']}`")
st.write(f"**Submitted:** {item['submitted_at']}")
with tab2:
st.header("Query Knowledge Base")
st.markdown("Ask questions based on ingested web content.")
question_input = st.text_area(
"Enter your question:",
placeholder="What are the main topics discussed in the ingested articles?",
height=100,
key="question_input"
)
top_k = st.slider(
"Number of sources to retrieve:",
min_value=1,
max_value=10,
value=5,
help="More sources provide more context but may include less relevant information"
)
query_button = st.button("πŸ” Search & Answer", type="primary")
if query_button:
if not question_input:
st.error("Please enter a question")
else:
try:
with st.spinner("Searching knowledge base and generating answer..."):
response = requests.post(
f"{API_BASE_URL}/query",
json={
"question": question_input,
"top_k": top_k
},
timeout=30
)
if response.status_code == 200:
data = response.json()
st.markdown("### πŸ’‘ Answer")
st.markdown(f"**Question:** {data['question']}")
st.markdown("---")
st.markdown(data['answer'])
st.markdown("---")
st.markdown(f"### πŸ“š Sources ({len(data['sources'])} found)")
for i, source in enumerate(data['sources'], 1):
with st.expander(f"Source {i} - Relevance: {source['score']:.2%}"):
st.markdown(f"**URL:** [{source['url']}]({source['url']})")
st.markdown(f"**Relevance Score:** {source['score']:.4f}")
st.markdown("**Excerpt:**")
st.info(source['text_snippet'])
elif response.status_code == 500:
error_data = response.json()
if "GROQ_API_KEY not configured" in error_data.get('detail', ''):
st.error("⚠️ Groq API key is not configured. Please set GROQ_API_KEY in your .env file.")
else:
st.error(f"Server error: {error_data.get('detail', 'Unknown error')}")
else:
st.error(f"Error: {response.status_code} - {response.text}")
except requests.exceptions.ConnectionError:
st.error("❌ Cannot connect to API server. Make sure FastAPI is running on port 5000.")
except Exception as e:
st.error(f"Error: {str(e)}")
with tab3:
st.header("System Status Dashboard")
col1, col2 = st.columns(2)
with col1:
st.subheader("πŸ₯ Health Check")
if st.button("Check System Health"):
try:
response = requests.get(f"{API_BASE_URL}/health", timeout=5)
if response.status_code == 200:
health_data = response.json()
st.success(f"**Status:** {health_data['status']}")
if health_data.get('redis_connected'):
st.success("βœ… Redis: Connected")
st.info(f"Queue Length: {health_data.get('queue_length', 'N/A')}")
else:
st.error("❌ Redis: Not Connected")
else:
st.error(f"Health check failed: {response.status_code}")
except requests.exceptions.ConnectionError:
st.error("❌ API Server: Not Running")
except Exception as e:
st.error(f"Error: {str(e)}")
with col2:
st.subheader("πŸ” Check URL Status")
url_id_input = st.text_input(
"Enter URL ID:",
placeholder="uuid-here",
key="url_id_check"
)
if st.button("Check Status"):
if not url_id_input:
st.error("Please enter a URL ID")
else:
try:
response = requests.get(
f"{API_BASE_URL}/status/{url_id_input}",
timeout=5
)
if response.status_code == 200:
status_data = response.json()
status_color = {
'pending': '🟑',
'processing': 'πŸ”„',
'completed': 'βœ…',
'failed': '❌'
}.get(status_data['status'], 'βšͺ')
st.markdown(f"### {status_color} Status: **{status_data['status'].upper()}**")
st.markdown(f"**URL:** [{status_data['url']}]({status_data['url']})")
st.markdown(f"**Created:** {status_data['created_at']}")
st.markdown(f"**Updated:** {status_data['updated_at']}")
if status_data['status'] == 'completed':
st.success(f"βœ… Completed at: {status_data['completed_at']}")
st.info(f"πŸ“Š Total chunks: {status_data['chunk_count']}")
elif status_data['status'] == 'failed':
st.error(f"Error: {status_data.get('error_message', 'Unknown error')}")
elif response.status_code == 404:
st.warning("URL ID not found")
else:
st.error(f"Error: {response.status_code}")
except requests.exceptions.ConnectionError:
st.error("❌ Cannot connect to API server")
except Exception as e:
st.error(f"Error: {str(e)}")
st.sidebar.title("ℹ️ About")
st.sidebar.markdown("""
### RAG Knowledge Base System
This application uses Retrieval-Augmented Generation (RAG) to:
1. **Ingest** web content from URLs
2. **Process** and chunk the content
3. **Embed** text using sentence-transformers
4. **Store** in Qdrant vector database
5. **Query** with semantic search
6. **Generate** grounded answers via Groq AI
### System Requirements
- Redis (queue management)
- Qdrant (vector database)
- FastAPI backend (port 5000)
- Background worker process
- Groq API key configured
### How to Use
1. **Ingest URLs** - Add web content to knowledge base
2. **Wait for Processing** - Check status dashboard
3. **Query** - Ask questions about the content
*Built with FastAPI, Streamlit, and modern AI technologies*
""")
st.sidebar.markdown("---")
st.sidebar.markdown(f"**API Endpoint:** `{API_BASE_URL}`")
st.sidebar.markdown("**Version:** 1.0.0")