import streamlit as st
import os
from pathlib import Path
import time
from typing import List, Dict, Any
from datetime import datetime
import google.generativeai as genai
from vector_store import VectorStore
from admin import AdminPanel
from config import Config
from utils import validate_api_key, format_response, log_interaction
# Page configuration
st.set_page_config(
page_title="BLUESCARF AI - HR Assistant",
page_icon="🔷",
layout="wide",
initial_sidebar_state="collapsed"
)
# Custom CSS for enhanced UX and professional styling
st.markdown("""
""", unsafe_allow_html=True)
class HRAssistant:
def __init__(self):
self.config = Config()
self.vector_store = VectorStore()
self.admin_panel = AdminPanel()
def initialize_session_state(self):
"""Initialize session state variables"""
if 'messages' not in st.session_state:
st.session_state.messages = []
if 'api_key_validated' not in st.session_state:
st.session_state.api_key_validated = False
if 'show_admin' not in st.session_state:
st.session_state.show_admin = False
if 'admin_authenticated' not in st.session_state:
st.session_state.admin_authenticated = False
def render_header(self):
"""Render application header with logo"""
st.markdown("""
BLUESCARF ARTIFICIAL INTELLIGENCE
HR Assistant
""", unsafe_allow_html=True)
# Logo placeholder - replace logo.png with actual company logo
logo_path = Path("logo.png")
if logo_path.exists():
st.image("logo.png", width=200)
else:
st.info("📋 Replace 'logo.png' with your company logo")
def setup_gemini_api(self, api_key: str) -> bool:
"""Configure Gemini API with provided key"""
try:
if not validate_api_key(api_key):
return False
genai.configure(api_key=api_key)
# Test API connection
model = genai.GenerativeModel('gemini-1.5-flash')
test_response = model.generate_content("Hello")
st.session_state.api_key_validated = True
st.session_state.model = model
return True
except Exception as e:
st.error(f"API Configuration Error: {str(e)}")
return False
def get_relevant_context(self, query: str) -> List[Dict[str, Any]]:
"""Retrieve relevant context from vector store"""
return self._retrieve_relevant_context(query)
def generate_response(self, query: str, context: List[Dict[str, Any]]) -> str:
"""Generate response using Gemini API with retrieved context"""
return self._generate_contextual_response(query, context)
def is_hr_related_query(self, query: str) -> bool:
"""Check if query is HR-related using enhanced classification"""
return self._is_hr_related_query(query)
# Log interaction
log_interaction(query, response)
def render_chat_interface(self):
"""Render the main chat interface with robust state management"""
st.markdown("### 💬 Chat with HR Assistant")
# Initialize input state management
if 'input_processed' not in st.session_state:
st.session_state.input_processed = False
if 'last_input' not in st.session_state:
st.session_state.last_input = ""
# Chat message container
self._render_chat_messages()
# Input interface with intelligent state handling
self._render_chat_input()
# Chat controls
self._render_chat_controls()
def _render_chat_messages(self):
"""Render chat message history with optimized layout"""
if not st.session_state.messages:
st.info("👋 Welcome! Ask me anything about BLUESCARF AI HR policies and procedures.")
return
# Create scrollable chat container
chat_container = st.container()
with chat_container:
for idx, message in enumerate(st.session_state.messages):
message_key = f"msg_{idx}_{message.get('timestamp', time.time())}"
if message["role"] == "user":
st.markdown(f"""
You: {message["content"]}
""", unsafe_allow_html=True)
else:
st.markdown(f"""
HR Assistant: {message["content"]}
""", unsafe_allow_html=True)
def _render_chat_input(self):
"""Render chat input with intelligent state management to prevent loops"""
col1, col2 = st.columns([5, 1])
with col1:
# Dynamic input key to prevent state persistence issues
input_key = f"chat_input_{len(st.session_state.messages)}"
user_input = st.text_input(
"Ask me about company policies, benefits, procedures...",
key=input_key,
placeholder="Type your HR question here...",
value="" # Always start with empty value
)
with col2:
send_button = st.button("Send", type="primary", key=f"send_{len(st.session_state.messages)}")
# Process input with anti-loop protection
if send_button and user_input and user_input.strip():
# Prevent duplicate processing
if user_input != st.session_state.last_input or not st.session_state.input_processed:
self._process_user_query(user_input.strip())
st.session_state.last_input = user_input.strip()
st.session_state.input_processed = True
# Trigger rerun to update UI with new messages
st.rerun()
else:
st.warning("⚠️ Query already processed. Please ask a new question.")
# Reset processing flag when input changes
if user_input != st.session_state.last_input:
st.session_state.input_processed = False
def _render_chat_controls(self):
"""Render chat control buttons with proper state management"""
if not st.session_state.messages:
return
col1, col2, col3 = st.columns([2, 2, 2])
with col1:
if st.button("🗑️ Clear Chat", key="clear_chat_btn"):
self._clear_chat_session()
with col2:
if st.button("📥 Export Chat", key="export_chat_btn"):
self._export_chat_history()
with col3:
st.caption(f"💬 {len(st.session_state.messages)} messages")
def _process_user_query(self, query: str):
"""Process user query with enhanced error handling and state management"""
if not query or len(query.strip()) < 3:
st.warning("⚠️ Please enter a meaningful question.")
return
# Add user message to chat history
user_message = {
"role": "user",
"content": query,
"timestamp": time.time(),
"message_id": self._generate_message_id()
}
st.session_state.messages.append(user_message)
# Process query and generate response
try:
with st.spinner("🤔 Thinking..."):
response = self._generate_intelligent_response(query)
# Add assistant response to chat history
assistant_message = {
"role": "assistant",
"content": response,
"timestamp": time.time(),
"message_id": self._generate_message_id(),
"query_processed": query
}
st.session_state.messages.append(assistant_message)
# Log successful interaction
self._log_successful_interaction(query, response)
except Exception as e:
error_response = f"I apologize, but I encountered an error processing your request: {str(e)}. Please try rephrasing your question."
assistant_message = {
"role": "assistant",
"content": error_response,
"timestamp": time.time(),
"message_id": self._generate_message_id(),
"error": True
}
st.session_state.messages.append(assistant_message)
# Log error for debugging
self._log_error_interaction(query, str(e))
def _generate_intelligent_response(self, query: str) -> str:
"""Generate contextually aware response using RAG pipeline"""
# Validate query scope
if not self._is_hr_related_query(query):
return self._get_scope_redirect_message()
# Retrieve relevant context
context_chunks = self._retrieve_relevant_context(query)
if not context_chunks:
return self._get_no_context_message()
# Generate response using Gemini API
return self._generate_contextual_response(query, context_chunks)
def _retrieve_relevant_context(self, query: str) -> List[Dict[str, Any]]:
"""Retrieve relevant context with enhanced error handling"""
try:
return self.vector_store.similarity_search(
query,
k=self.config.MAX_CONTEXT_CHUNKS
)
except Exception as e:
st.error(f"Context retrieval error: {str(e)}")
return []
def _generate_contextual_response(self, query: str, context: List[Dict[str, Any]]) -> str:
"""Generate response using Gemini API with retrieved context"""
try:
# Prepare context for prompt engineering
context_text = self._format_context_for_prompt(context)
# Construct optimized prompt
prompt = self._build_contextual_prompt(query, context_text)
# Generate response with error handling
response = st.session_state.model.generate_content(prompt)
return self._format_and_validate_response(response.text)
except Exception as e:
return f"I apologize, but I encountered an error generating a response: {str(e)}. Please try rephrasing your question."
def _format_context_for_prompt(self, context: List[Dict[str, Any]]) -> str:
"""Format context chunks for optimal prompt engineering"""
formatted_sections = []
for idx, chunk in enumerate(context, 1):
source = chunk['metadata'].get('source', 'Company Document')
content = chunk['content']
formatted_sections.append(
f"[Document {idx}: {source}]\n{content}\n"
)
return "\n".join(formatted_sections)
def _build_contextual_prompt(self, query: str, context_text: str) -> str:
"""Build optimized prompt for Gemini API"""
system_context = self.config.get_hr_context_prompt()
return f"""{system_context}
COMPANY DOCUMENT CONTEXT:
{context_text}
USER QUESTION: {query}
RESPONSE GUIDELINES:
- Answer based ONLY on the provided company documents
- Be specific and reference relevant policies
- If information is incomplete, state what's available and suggest contacting HR
- Maintain professional, helpful tone
- Provide actionable guidance when possible
RESPONSE:"""
def _format_and_validate_response(self, response_text: str) -> str:
"""Format and validate AI response for optimal user experience"""
if not response_text or len(response_text.strip()) < 10:
return "I apologize, but I couldn't generate a meaningful response. Please try rephrasing your question."
# Enhanced text formatting
formatted_response = self._enhance_response_formatting(response_text.strip())
# Add contextual footer if response is substantial
if len(formatted_response) > 150:
formatted_response += "\n\n*For additional assistance, please contact the HR department.*"
return formatted_response
def _enhance_response_formatting(self, text: str) -> str:
"""Apply intelligent formatting enhancements"""
# Remove AI response artifacts
cleaned = text.replace("Based on the provided documents,", "")
cleaned = cleaned.replace("According to the company policies,", "")
# Ensure proper sentence spacing
sentences = cleaned.split('. ')
properly_spaced = '. '.join(sentence.strip() for sentence in sentences if sentence.strip())
return properly_spaced
def _is_hr_related_query(self, query: str) -> bool:
"""Enhanced HR query classification with fuzzy matching"""
hr_indicators = [
'policy', 'leave', 'vacation', 'sick', 'holiday', 'benefit', 'insurance',
'salary', 'compensation', 'promotion', 'performance', 'review', 'training',
'onboarding', 'handbook', 'procedure', 'guideline', 'hr', 'human resources',
'employee', 'staff', 'team', 'department', 'work', 'job', 'role',
'resignation', 'termination', 'disciplinary', 'conduct', 'harassment'
]
query_lower = query.lower()
return any(indicator in query_lower for indicator in hr_indicators)
def _get_scope_redirect_message(self) -> str:
"""Get polite redirect message for non-HR queries"""
return ("I'm specifically designed to assist with BLUESCARF AI HR-related questions "
"using our company policies and documents. Please ask me about company "
"policies, benefits, leave procedures, or other HR matters.")
def _get_no_context_message(self) -> str:
"""Get message when no relevant context is found"""
return ("I couldn't find relevant information in our company documents for your "
"question. Please contact HR directly for assistance, or try rephrasing "
"your question using different terms.")
def _clear_chat_session(self):
"""Clear chat session with proper state reset"""
st.session_state.messages = []
st.session_state.input_processed = False
st.session_state.last_input = ""
st.success("🗑️ Chat history cleared!")
st.rerun()
def _export_chat_history(self):
"""Export chat history for user reference"""
if not st.session_state.messages:
st.warning("No chat history to export.")
return
# Create exportable format
export_content = "BLUESCARF AI HR Assistant - Chat Export\n"
export_content += f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
for message in st.session_state.messages:
role = "You" if message["role"] == "user" else "HR Assistant"
timestamp = datetime.fromtimestamp(message["timestamp"]).strftime('%H:%M:%S')
export_content += f"[{timestamp}] {role}: {message['content']}\n\n"
st.download_button(
label="📥 Download Chat History",
data=export_content,
file_name=f"hr_chat_export_{int(time.time())}.txt",
mime="text/plain"
)
def _generate_message_id(self) -> str:
"""Generate unique message identifier"""
return f"msg_{int(time.time() * 1000)}_{len(st.session_state.messages)}"
def _log_successful_interaction(self, query: str, response: str):
"""Log successful interaction for analytics"""
try:
log_interaction(query, response, {
'success': True,
'response_length': len(response),
'session_messages': len(st.session_state.messages)
})
except Exception:
pass # Silent fail for logging
def _log_error_interaction(self, query: str, error: str):
"""Log error interaction for debugging"""
try:
log_interaction(query, f"ERROR: {error}", {
'success': False,
'error_type': 'processing_error',
'session_messages': len(st.session_state.messages)
})
except Exception:
pass # Silent fail for logging
def render_admin_section(self):
"""Render admin panel section"""
st.markdown("---")
col1, col2 = st.columns([3, 1])
with col1:
st.markdown("### 🔧 Administrator Panel")
st.markdown("*Manage knowledge base and update company documents*")
with col2:
if st.button("Admin Access"):
st.session_state.show_admin = not st.session_state.show_admin
if st.session_state.show_admin:
self.admin_panel.render()
def render_footer(self):
"""Render application footer"""
st.markdown("""
""", unsafe_allow_html=True)
def run(self):
"""Main application entry point"""
self.initialize_session_state()
self.render_header()
# API Key input
if not st.session_state.api_key_validated:
st.markdown("### 🔑 API Configuration")
with st.form("api_key_form"):
api_key = st.text_input(
"Enter your Google Gemini API Key:",
type="password",
help="Get your API key from https://makersuite.google.com/app/apikey"
)
submitted = st.form_submit_button("Connect", type="primary")
if submitted and api_key:
with st.spinner("Validating API key..."):
if self.setup_gemini_api(api_key):
st.success("✅ API key validated successfully!")
st.rerun()
else:
st.error("❌ Invalid API key. Please check and try again.")
# Show knowledge base status
doc_count = self.vector_store.get_document_count()
if doc_count > 0:
st.info(f"📚 Knowledge base contains {doc_count} processed documents")
else:
st.warning("⚠️ No documents in knowledge base. Please use admin panel to add company documents.")
else:
# Main application interface
self.render_chat_interface()
self.render_admin_section()
self.render_footer()
def main():
"""Application entry point"""
app = HRAssistant()
app.run()
if __name__ == "__main__":
main()