Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import requests | |
| import json | |
| import time | |
| import os | |
| from datetime import datetime | |
| API_BASE_URL = os.environ.get("API_BASE_URL", "http://localhost:5000") | |
| st.set_page_config( | |
| page_title="RAG System - Knowledge Base", | |
| page_icon="π§ ", | |
| layout="wide" | |
| ) | |
| st.title("π§ RAG System - Web Knowledge Base") | |
| st.markdown("*Ingest web content and query it with AI-powered semantic search*") | |
| tab1, tab2, tab3 = st.tabs(["π₯ Ingest URLs", "π Query Knowledge Base", "π Status Dashboard"]) | |
| with tab1: | |
| st.header("Ingest Web Content") | |
| st.markdown("Submit URLs to be processed and added to the knowledge base.") | |
| url_input = st.text_input( | |
| "Enter URL to ingest:", | |
| placeholder="https://example.com/article", | |
| key="url_input" | |
| ) | |
| col1, col2 = st.columns([1, 4]) | |
| with col1: | |
| ingest_button = st.button("π Ingest URL", type="primary") | |
| if ingest_button: | |
| if not url_input: | |
| st.error("Please enter a URL") | |
| else: | |
| try: | |
| with st.spinner("Submitting URL for processing..."): | |
| response = requests.post( | |
| f"{API_BASE_URL}/ingest-url", | |
| json={"url": url_input}, | |
| timeout=10 | |
| ) | |
| if response.status_code == 202: | |
| data = response.json() | |
| st.success(f"β URL submitted successfully!") | |
| st.info(f"**URL ID:** `{data['url_id']}`") | |
| st.info(f"**Status:** {data['status']}") | |
| st.markdown(data['message']) | |
| if 'ingestion_status' not in st.session_state: | |
| st.session_state.ingestion_status = [] | |
| st.session_state.ingestion_status.append({ | |
| 'url_id': data['url_id'], | |
| 'url': data['url'], | |
| 'submitted_at': datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| }) | |
| else: | |
| st.error(f"Error: {response.status_code} - {response.text}") | |
| except requests.exceptions.ConnectionError: | |
| st.error("β Cannot connect to API server. Make sure FastAPI is running on port 5000.") | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| if 'ingestion_status' in st.session_state and st.session_state.ingestion_status: | |
| st.markdown("---") | |
| st.subheader("Recently Submitted URLs") | |
| for item in reversed(st.session_state.ingestion_status[-5:]): | |
| with st.expander(f"π {item['url']}"): | |
| st.write(f"**URL ID:** `{item['url_id']}`") | |
| st.write(f"**Submitted:** {item['submitted_at']}") | |
| with tab2: | |
| st.header("Query Knowledge Base") | |
| st.markdown("Ask questions based on ingested web content.") | |
| question_input = st.text_area( | |
| "Enter your question:", | |
| placeholder="What are the main topics discussed in the ingested articles?", | |
| height=100, | |
| key="question_input" | |
| ) | |
| top_k = st.slider( | |
| "Number of sources to retrieve:", | |
| min_value=1, | |
| max_value=10, | |
| value=5, | |
| help="More sources provide more context but may include less relevant information" | |
| ) | |
| query_button = st.button("π Search & Answer", type="primary") | |
| if query_button: | |
| if not question_input: | |
| st.error("Please enter a question") | |
| else: | |
| try: | |
| with st.spinner("Searching knowledge base and generating answer..."): | |
| response = requests.post( | |
| f"{API_BASE_URL}/query", | |
| json={ | |
| "question": question_input, | |
| "top_k": top_k | |
| }, | |
| timeout=30 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| st.markdown("### π‘ Answer") | |
| st.markdown(f"**Question:** {data['question']}") | |
| st.markdown("---") | |
| st.markdown(data['answer']) | |
| st.markdown("---") | |
| st.markdown(f"### π Sources ({len(data['sources'])} found)") | |
| for i, source in enumerate(data['sources'], 1): | |
| with st.expander(f"Source {i} - Relevance: {source['score']:.2%}"): | |
| st.markdown(f"**URL:** [{source['url']}]({source['url']})") | |
| st.markdown(f"**Relevance Score:** {source['score']:.4f}") | |
| st.markdown("**Excerpt:**") | |
| st.info(source['text_snippet']) | |
| elif response.status_code == 500: | |
| error_data = response.json() | |
| if "GROQ_API_KEY not configured" in error_data.get('detail', ''): | |
| st.error("β οΈ Groq API key is not configured. Please set GROQ_API_KEY in your .env file.") | |
| else: | |
| st.error(f"Server error: {error_data.get('detail', 'Unknown error')}") | |
| else: | |
| st.error(f"Error: {response.status_code} - {response.text}") | |
| except requests.exceptions.ConnectionError: | |
| st.error("β Cannot connect to API server. Make sure FastAPI is running on port 5000.") | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| with tab3: | |
| st.header("System Status Dashboard") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("π₯ Health Check") | |
| if st.button("Check System Health"): | |
| try: | |
| response = requests.get(f"{API_BASE_URL}/health", timeout=5) | |
| if response.status_code == 200: | |
| health_data = response.json() | |
| st.success(f"**Status:** {health_data['status']}") | |
| if health_data.get('redis_connected'): | |
| st.success("β Redis: Connected") | |
| st.info(f"Queue Length: {health_data.get('queue_length', 'N/A')}") | |
| else: | |
| st.error("β Redis: Not Connected") | |
| else: | |
| st.error(f"Health check failed: {response.status_code}") | |
| except requests.exceptions.ConnectionError: | |
| st.error("β API Server: Not Running") | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| with col2: | |
| st.subheader("π Check URL Status") | |
| url_id_input = st.text_input( | |
| "Enter URL ID:", | |
| placeholder="uuid-here", | |
| key="url_id_check" | |
| ) | |
| if st.button("Check Status"): | |
| if not url_id_input: | |
| st.error("Please enter a URL ID") | |
| else: | |
| try: | |
| response = requests.get( | |
| f"{API_BASE_URL}/status/{url_id_input}", | |
| timeout=5 | |
| ) | |
| if response.status_code == 200: | |
| status_data = response.json() | |
| status_color = { | |
| 'pending': 'π‘', | |
| 'processing': 'π', | |
| 'completed': 'β ', | |
| 'failed': 'β' | |
| }.get(status_data['status'], 'βͺ') | |
| st.markdown(f"### {status_color} Status: **{status_data['status'].upper()}**") | |
| st.markdown(f"**URL:** [{status_data['url']}]({status_data['url']})") | |
| st.markdown(f"**Created:** {status_data['created_at']}") | |
| st.markdown(f"**Updated:** {status_data['updated_at']}") | |
| if status_data['status'] == 'completed': | |
| st.success(f"β Completed at: {status_data['completed_at']}") | |
| st.info(f"π Total chunks: {status_data['chunk_count']}") | |
| elif status_data['status'] == 'failed': | |
| st.error(f"Error: {status_data.get('error_message', 'Unknown error')}") | |
| elif response.status_code == 404: | |
| st.warning("URL ID not found") | |
| else: | |
| st.error(f"Error: {response.status_code}") | |
| except requests.exceptions.ConnectionError: | |
| st.error("β Cannot connect to API server") | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| st.sidebar.title("βΉοΈ About") | |
| st.sidebar.markdown(""" | |
| ### RAG Knowledge Base System | |
| This application uses Retrieval-Augmented Generation (RAG) to: | |
| 1. **Ingest** web content from URLs | |
| 2. **Process** and chunk the content | |
| 3. **Embed** text using sentence-transformers | |
| 4. **Store** in Qdrant vector database | |
| 5. **Query** with semantic search | |
| 6. **Generate** grounded answers via Groq AI | |
| ### System Requirements | |
| - Redis (queue management) | |
| - Qdrant (vector database) | |
| - FastAPI backend (port 5000) | |
| - Background worker process | |
| - Groq API key configured | |
| ### How to Use | |
| 1. **Ingest URLs** - Add web content to knowledge base | |
| 2. **Wait for Processing** - Check status dashboard | |
| 3. **Query** - Ask questions about the content | |
| *Built with FastAPI, Streamlit, and modern AI technologies* | |
| """) | |
| st.sidebar.markdown("---") | |
| st.sidebar.markdown(f"**API Endpoint:** `{API_BASE_URL}`") | |
| st.sidebar.markdown("**Version:** 1.0.0") | |