Spaces:
Sleeping
Sleeping
| import re | |
| import time | |
| import json | |
| import logging | |
| from typing import Any, Dict, List, Optional, Union, Tuple | |
| from pathlib import Path | |
| import streamlit as st | |
| from datetime import datetime, timedelta | |
| import hashlib | |
| import uuid | |
| from config import Config | |
| class InteractionLogger: | |
| """Advanced logging system for user interactions and system monitoring.""" | |
| def __init__(self, config: Config): | |
| self.config = config | |
| self.logger = self._setup_logger() | |
| self.interaction_log_path = config.LOG_FILE_PATH.parent / "interactions.jsonl" | |
| def _setup_logger(self) -> logging.Logger: | |
| """Configure professional logging with rotation and formatting.""" | |
| logger = logging.getLogger("hr_assistant") | |
| logger.setLevel(getattr(logging, self.config.LOG_LEVEL)) | |
| # Prevent duplicate handlers | |
| if not logger.handlers: | |
| # File handler with rotation | |
| from logging.handlers import RotatingFileHandler | |
| file_handler = RotatingFileHandler( | |
| self.config.LOG_FILE_PATH, | |
| maxBytes=self.config.get_logging_config()['max_file_size'], | |
| backupCount=self.config.get_logging_config()['backup_count'] | |
| ) | |
| # Console handler for development | |
| if self.config.get_logging_config()['console_output']: | |
| console_handler = logging.StreamHandler() | |
| console_handler.setLevel(logging.INFO) | |
| logger.addHandler(console_handler) | |
| # Formatter with structured information | |
| formatter = logging.Formatter( | |
| self.config.get_logging_config()['log_format'] | |
| ) | |
| file_handler.setFormatter(formatter) | |
| logger.addHandler(file_handler) | |
| return logger | |
| def log_interaction(self, query: str, response: str, metadata: Optional[Dict] = None): | |
| """Log user interactions for analysis and improvement.""" | |
| if not self.config.ENABLE_INTERACTION_LOGGING: | |
| return | |
| interaction_data = { | |
| 'timestamp': time.time(), | |
| 'session_id': self._get_session_id(), | |
| 'query': query, | |
| 'response_length': len(response), | |
| 'query_length': len(query), | |
| 'query_type': self._classify_query(query), | |
| 'metadata': metadata or {} | |
| } | |
| try: | |
| self.interaction_log_path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(self.interaction_log_path, 'a') as f: | |
| f.write(json.dumps(interaction_data) + '\n') | |
| except Exception as e: | |
| self.logger.warning(f"Failed to log interaction: {str(e)}") | |
| def _get_session_id(self) -> str: | |
| """Generate or retrieve session identifier for tracking.""" | |
| if 'session_id' not in st.session_state: | |
| st.session_state.session_id = str(uuid.uuid4())[:8] | |
| return st.session_state.session_id | |
| def _classify_query(self, query: str) -> str: | |
| """Intelligent query classification for analytics.""" | |
| query_lower = query.lower() | |
| policy_keywords = ['policy', 'procedure', 'guideline', 'rule'] | |
| benefit_keywords = ['benefit', 'insurance', 'health', 'dental', '401k', 'retirement'] | |
| leave_keywords = ['leave', 'vacation', 'sick', 'pto', 'holiday', 'time off'] | |
| payroll_keywords = ['salary', 'pay', 'payroll', 'compensation', 'bonus'] | |
| if any(keyword in query_lower for keyword in policy_keywords): | |
| return 'policy_inquiry' | |
| elif any(keyword in query_lower for keyword in benefit_keywords): | |
| return 'benefits_inquiry' | |
| elif any(keyword in query_lower for keyword in leave_keywords): | |
| return 'leave_inquiry' | |
| elif any(keyword in query_lower for keyword in payroll_keywords): | |
| return 'payroll_inquiry' | |
| else: | |
| return 'general_inquiry' | |
| # Global logger instance | |
| config = Config() | |
| interaction_logger = InteractionLogger(config) | |
| def validate_api_key(api_key: str) -> bool: | |
| """ | |
| Validate Google Gemini API key format and basic structure. | |
| Args: | |
| api_key: API key string to validate | |
| Returns: | |
| True if key appears valid, False otherwise | |
| """ | |
| if not api_key or not isinstance(api_key, str): | |
| return False | |
| # Basic format validation for Google API keys | |
| # They typically start with 'AIza' and are 39 characters long | |
| api_key = api_key.strip() | |
| if len(api_key) < 30: # Too short to be valid | |
| return False | |
| if len(api_key) > 50: # Too long to be typical | |
| return False | |
| # Check for suspicious patterns | |
| if api_key.lower() in ['test', 'demo', 'placeholder', 'your_api_key']: | |
| return False | |
| # Basic character validation (alphanumeric and common symbols) | |
| if not re.match(r'^[A-Za-z0-9_-]+$', api_key): | |
| return False | |
| return True | |
| def format_response(response_text: str) -> str: | |
| """ | |
| Intelligently format and enhance AI response for optimal user experience. | |
| Args: | |
| response_text: Raw response from AI model | |
| Returns: | |
| Formatted and enhanced response text | |
| """ | |
| if not response_text: | |
| return "I apologize, but I couldn't generate a response. Please try rephrasing your question." | |
| # Remove common AI response artifacts | |
| cleaned_text = response_text.strip() | |
| # Remove repetitive phrases or AI disclaimers | |
| artifact_patterns = [ | |
| r'^(As an AI|I am an AI|According to the|Based on the).*?[,.]?\s*', | |
| r'\b(please note that|it\'s important to note|keep in mind)\b.*?[.!]', | |
| r'\b(I hope this helps|Hope this helps|Let me know if you need)\b.*?[.!]?$' | |
| ] | |
| for pattern in artifact_patterns: | |
| cleaned_text = re.sub(pattern, '', cleaned_text, flags=re.IGNORECASE) | |
| # Improve formatting structure | |
| cleaned_text = _enhance_text_structure(cleaned_text) | |
| # Add professional closing if response is substantial | |
| if len(cleaned_text) > 200 and not _has_closing_statement(cleaned_text): | |
| cleaned_text += "\n\nIf you need additional clarification or have related questions, please don't hesitate to ask." | |
| return cleaned_text.strip() | |
| def _enhance_text_structure(text: str) -> str: | |
| """Enhance text structure with better paragraphs and formatting.""" | |
| # Fix paragraph spacing | |
| text = re.sub(r'\n{3,}', '\n\n', text) | |
| # Ensure proper spacing after periods | |
| text = re.sub(r'\.([A-Z])', r'. \1', text) | |
| # Fix common formatting issues | |
| text = re.sub(r'\s+', ' ', text) # Multiple spaces to single | |
| text = re.sub(r'([.!?])\s*\n\s*([a-z])', r'\1 \2', text) # Fix broken sentences | |
| # Enhance list formatting | |
| text = re.sub(r'\n(\d+\.|\*|\-)\s*', r'\n\n\1 ', text) | |
| return text | |
| def _has_closing_statement(text: str) -> bool: | |
| """Check if text already has a professional closing statement.""" | |
| closing_patterns = [ | |
| r'please.*?(contact|reach out|ask|let.*know)', | |
| r'if you.*?(need|have|require)', | |
| r'feel free to.*?(ask|contact|reach)', | |
| r'don\'t hesitate to.*?(ask|contact|reach)' | |
| ] | |
| text_lower = text.lower() | |
| return any(re.search(pattern, text_lower) for pattern in closing_patterns) | |
| def log_interaction(query: str, response: str, metadata: Optional[Dict] = None): | |
| """ | |
| Convenience function for logging user interactions. | |
| Args: | |
| query: User's question or input | |
| response: System's response | |
| metadata: Additional context information | |
| """ | |
| interaction_logger.log_interaction(query, response, metadata) | |
| def sanitize_filename(filename: str) -> str: | |
| """ | |
| Sanitize filename for safe storage while preserving readability. | |
| Args: | |
| filename: Original filename | |
| Returns: | |
| Sanitized filename safe for filesystem operations | |
| """ | |
| # Remove or replace problematic characters | |
| sanitized = re.sub(r'[<>:"/\\|?*]', '_', filename) | |
| # Remove multiple underscores | |
| sanitized = re.sub(r'_{2,}', '_', sanitized) | |
| # Ensure reasonable length | |
| name, ext = Path(filename).stem, Path(filename).suffix | |
| if len(name) > 100: | |
| name = name[:100] | |
| sanitized = f"{name}{ext}" | |
| # Ensure not empty or just extension | |
| if not sanitized or sanitized.startswith('.'): | |
| sanitized = f"document_{int(time.time())}.pdf" | |
| return sanitized | |
| def calculate_text_similarity(text1: str, text2: str) -> float: | |
| """ | |
| Calculate semantic similarity between two text strings using word overlap. | |
| Args: | |
| text1: First text string | |
| text2: Second text string | |
| Returns: | |
| Similarity score between 0 and 1 | |
| """ | |
| # Tokenize and normalize | |
| words1 = set(text1.lower().split()) | |
| words2 = set(text2.lower().split()) | |
| # Calculate Jaccard similarity | |
| intersection = words1.intersection(words2) | |
| union = words1.union(words2) | |
| if not union: | |
| return 0.0 | |
| return len(intersection) / len(union) | |
| def extract_key_phrases(text: str, max_phrases: int = 5) -> List[str]: | |
| """ | |
| Extract key phrases from text for metadata and search optimization. | |
| Args: | |
| text: Input text to analyze | |
| max_phrases: Maximum number of phrases to extract | |
| Returns: | |
| List of key phrases | |
| """ | |
| # Simple extraction based on frequency and HR domain relevance | |
| hr_relevant_terms = { | |
| 'policy', 'procedure', 'benefit', 'leave', 'vacation', 'sick', 'health', | |
| 'insurance', 'retirement', '401k', 'pto', 'holiday', 'payroll', 'salary', | |
| 'compensation', 'performance', 'review', 'training', 'onboarding', | |
| 'termination', 'resignation', 'discipline', 'harassment', 'diversity' | |
| } | |
| words = re.findall(r'\b[a-zA-Z]{3,}\b', text.lower()) | |
| word_freq = {} | |
| for word in words: | |
| if word in hr_relevant_terms: | |
| word_freq[word] = word_freq.get(word, 0) + 2 # Boost HR terms | |
| else: | |
| word_freq[word] = word_freq.get(word, 0) + 1 | |
| # Extract top phrases | |
| key_phrases = sorted(word_freq.items(), key=lambda x: x[1], reverse=True) | |
| return [phrase[0] for phrase in key_phrases[:max_phrases]] | |
| def format_timestamp(timestamp: float, format_type: str = 'readable') -> str: | |
| """ | |
| Format timestamp for display in various contexts. | |
| Args: | |
| timestamp: Unix timestamp | |
| format_type: Type of formatting ('readable', 'short', 'iso') | |
| Returns: | |
| Formatted timestamp string | |
| """ | |
| dt = datetime.fromtimestamp(timestamp) | |
| if format_type == 'readable': | |
| return dt.strftime('%B %d, %Y at %I:%M %p') | |
| elif format_type == 'short': | |
| return dt.strftime('%m/%d/%Y %H:%M') | |
| elif format_type == 'iso': | |
| return dt.isoformat() | |
| else: | |
| return str(dt) | |
| def estimate_reading_time(text: str) -> int: | |
| """ | |
| Estimate reading time for text content in minutes. | |
| Args: | |
| text: Text content to analyze | |
| Returns: | |
| Estimated reading time in minutes | |
| """ | |
| # Average reading speed: 200-250 words per minute | |
| word_count = len(text.split()) | |
| reading_time = max(1, round(word_count / 225)) | |
| return reading_time | |
| def create_document_summary(text: str, max_length: int = 200) -> str: | |
| """ | |
| Create intelligent document summary for preview purposes. | |
| Args: | |
| text: Full document text | |
| max_length: Maximum summary length in characters | |
| Returns: | |
| Document summary | |
| """ | |
| # Extract first meaningful paragraph or section | |
| paragraphs = [p.strip() for p in text.split('\n\n') if len(p.strip()) > 50] | |
| if not paragraphs: | |
| return text[:max_length] + '...' if len(text) > max_length else text | |
| summary = paragraphs[0] | |
| # If first paragraph is too long, truncate intelligently | |
| if len(summary) > max_length: | |
| # Try to end at a sentence boundary | |
| sentences = summary.split('. ') | |
| truncated = sentences[0] | |
| for sentence in sentences[1:]: | |
| if len(truncated + '. ' + sentence) <= max_length - 3: | |
| truncated += '. ' + sentence | |
| else: | |
| break | |
| summary = truncated + '...' | |
| return summary | |
| def validate_document_content(text: str) -> Tuple[bool, List[str]]: | |
| """ | |
| Validate document content for HR relevance and quality. | |
| Args: | |
| text: Document text to validate | |
| Returns: | |
| Tuple of (is_valid, list_of_issues) | |
| """ | |
| issues = [] | |
| # Check minimum content length | |
| if len(text.strip()) < 100: | |
| issues.append("Document content is too short (minimum 100 characters)") | |
| # Check for readable text vs. scanned images | |
| word_count = len(text.split()) | |
| if word_count < 20: | |
| issues.append("Document appears to contain very little readable text") | |
| # Check for HR-relevant content | |
| hr_indicators = [ | |
| 'policy', 'employee', 'benefit', 'leave', 'vacation', 'sick', | |
| 'insurance', 'company', 'workplace', 'procedure', 'guideline', | |
| 'handbook', 'hr', 'human resources', 'personnel' | |
| ] | |
| text_lower = text.lower() | |
| hr_score = sum(1 for indicator in hr_indicators if indicator in text_lower) | |
| if hr_score < 2: | |
| issues.append("Document may not be HR-related (consider adding to appropriate knowledge base)") | |
| # Check for excessive repetition (common in corrupted PDFs) | |
| lines = text.split('\n') | |
| unique_lines = set(line.strip() for line in lines if line.strip()) | |
| if len(lines) > 10 and len(unique_lines) / len(lines) < 0.3: | |
| issues.append("Document contains excessive repetition (possible extraction error)") | |
| is_valid = len(issues) == 0 | |
| return is_valid, issues | |
| def create_session_analytics() -> Dict[str, Any]: | |
| """ | |
| Create analytics data for current session. | |
| Returns: | |
| Dictionary with session analytics | |
| """ | |
| session_data = { | |
| 'session_id': interaction_logger._get_session_id(), | |
| 'start_time': st.session_state.get('session_start', time.time()), | |
| 'current_time': time.time(), | |
| 'message_count': len(st.session_state.get('messages', [])), | |
| 'api_key_validated': st.session_state.get('api_key_validated', False), | |
| 'admin_accessed': st.session_state.get('admin_authenticated', False) | |
| } | |
| # Calculate session duration | |
| session_data['duration_minutes'] = ( | |
| session_data['current_time'] - session_data['start_time'] | |
| ) / 60 | |
| return session_data | |
| def safe_json_loads(json_string: str, default: Any = None) -> Any: | |
| """ | |
| Safely parse JSON string with fallback. | |
| Args: | |
| json_string: JSON string to parse | |
| default: Default value if parsing fails | |
| Returns: | |
| Parsed JSON or default value | |
| """ | |
| try: | |
| return json.loads(json_string) | |
| except (json.JSONDecodeError, TypeError): | |
| return default | |
| def hash_document_content(content: str) -> str: | |
| """ | |
| Create content-based hash for deduplication. | |
| Args: | |
| content: Document content | |
| Returns: | |
| SHA-256 hash of normalized content | |
| """ | |
| # Normalize content for consistent hashing | |
| normalized = re.sub(r'\s+', ' ', content.strip().lower()) | |
| return hashlib.sha256(normalized.encode()).hexdigest() | |
| def format_file_size(size_bytes: int) -> str: | |
| """ | |
| Format file size in human-readable format. | |
| Args: | |
| size_bytes: File size in bytes | |
| Returns: | |
| Formatted size string | |
| """ | |
| if size_bytes < 1024: | |
| return f"{size_bytes} B" | |
| elif size_bytes < 1024**2: | |
| return f"{size_bytes / 1024:.1f} KB" | |
| elif size_bytes < 1024**3: | |
| return f"{size_bytes / (1024**2):.1f} MB" | |
| else: | |
| return f"{size_bytes / (1024**3):.1f} GB" | |
| def create_backup_filename(original_filename: str) -> str: | |
| """ | |
| Create backup filename with timestamp. | |
| Args: | |
| original_filename: Original file name | |
| Returns: | |
| Backup filename with timestamp | |
| """ | |
| name, ext = Path(original_filename).stem, Path(original_filename).suffix | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| return f"{name}_backup_{timestamp}{ext}" | |
| def performance_monitor(func): | |
| """ | |
| Decorator for monitoring function performance. | |
| Args: | |
| func: Function to monitor | |
| Returns: | |
| Wrapped function with performance logging | |
| """ | |
| def wrapper(*args, **kwargs): | |
| start_time = time.time() | |
| try: | |
| result = func(*args, **kwargs) | |
| execution_time = time.time() - start_time | |
| if execution_time > 5: # Log slow operations | |
| interaction_logger.logger.warning( | |
| f"Slow operation: {func.__name__} took {execution_time:.2f}s" | |
| ) | |
| return result | |
| except Exception as e: | |
| execution_time = time.time() - start_time | |
| interaction_logger.logger.error( | |
| f"Function {func.__name__} failed after {execution_time:.2f}s: {str(e)}" | |
| ) | |
| raise | |
| return wrapper | |
| # Convenience functions for common operations | |
| def get_current_timestamp() -> float: | |
| """Get current timestamp for consistent time tracking.""" | |
| return time.time() | |
| def is_valid_email(email: str) -> bool: | |
| """Basic email validation for contact forms.""" | |
| pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' | |
| return bool(re.match(pattern, email)) | |
| def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str: | |
| """Intelligently truncate text at word boundaries.""" | |
| if len(text) <= max_length: | |
| return text | |
| truncated = text[:max_length - len(suffix)] | |
| # Try to break at word boundary | |
| last_space = truncated.rfind(' ') | |
| if last_space > max_length * 0.7: # If we can save at least 30% of the text | |
| truncated = truncated[:last_space] | |
| return truncated + suffix | |