Spaces:
Runtime error
Runtime error
| """ | |
| Integration tests for chat history persistence. | |
| Tests the complete integration between chat history storage, retrieval, and caching. | |
| """ | |
| import pytest | |
| import time | |
| import json | |
| from unittest.mock import patch, MagicMock | |
| from chat_agent.services.chat_history import ChatHistoryManager | |
| from chat_agent.services.session_manager import SessionManager | |
| from chat_agent.services.chat_agent import ChatAgent | |
| class TestChatHistoryPersistence: | |
| """Integration tests for chat history persistence functionality.""" | |
| def session_manager(self): | |
| """Create session manager for testing.""" | |
| return SessionManager() | |
| def chat_history_manager(self): | |
| """Create chat history manager for testing.""" | |
| return ChatHistoryManager() | |
| def mock_groq_client(self): | |
| """Mock Groq client for testing.""" | |
| with patch('chat_agent.services.groq_client.GroqClient') as mock: | |
| mock_instance = MagicMock() | |
| mock_instance.generate_response.return_value = "Test response from LLM" | |
| mock.return_value = mock_instance | |
| yield mock_instance | |
| def integrated_chat_system(self, mock_groq_client, session_manager, chat_history_manager): | |
| """Create integrated chat system for testing.""" | |
| from chat_agent.services.language_context import LanguageContextManager | |
| language_context_manager = LanguageContextManager() | |
| chat_agent = ChatAgent( | |
| groq_client=mock_groq_client, | |
| session_manager=session_manager, | |
| language_context_manager=language_context_manager, | |
| chat_history_manager=chat_history_manager | |
| ) | |
| return { | |
| 'chat_agent': chat_agent, | |
| 'session_manager': session_manager, | |
| 'chat_history_manager': chat_history_manager, | |
| 'language_context_manager': language_context_manager | |
| } | |
| def test_message_storage_and_retrieval_integration(self, integrated_chat_system): | |
| """Test complete message storage and retrieval workflow.""" | |
| system = integrated_chat_system | |
| user_id = "test-user-storage" | |
| # Create session | |
| session = system['session_manager'].create_session(user_id, language="python") | |
| session_id = session['session_id'] | |
| # Send multiple messages | |
| messages = [ | |
| "What is Python?", | |
| "How do I create a list?", | |
| "Can you explain functions?", | |
| "What are loops?", | |
| "How to handle exceptions?" | |
| ] | |
| for message in messages: | |
| response = system['chat_agent'].process_message( | |
| session_id=session_id, | |
| message=message, | |
| language="python" | |
| ) | |
| assert response is not None | |
| # Test different retrieval methods | |
| # 1. Recent history | |
| recent_history = system['chat_history_manager'].get_recent_history(session_id, limit=6) | |
| assert len(recent_history) == 6 # Last 3 conversations (user + assistant) | |
| # Verify order (most recent first) | |
| assert recent_history[0]['content'] == messages[-3] # 3rd to last user message | |
| assert recent_history[1]['role'] == 'assistant' | |
| assert recent_history[2]['content'] == messages[-2] # 2nd to last user message | |
| assert recent_history[3]['role'] == 'assistant' | |
| assert recent_history[4]['content'] == messages[-1] # Last user message | |
| assert recent_history[5]['role'] == 'assistant' | |
| # 2. Full history | |
| full_history = system['chat_history_manager'].get_full_history(session_id) | |
| assert len(full_history) == 10 # 5 user messages + 5 responses | |
| # Verify chronological order | |
| for i, message in enumerate(messages): | |
| assert full_history[i * 2]['content'] == message | |
| assert full_history[i * 2]['role'] == 'user' | |
| assert full_history[i * 2 + 1]['role'] == 'assistant' | |
| def test_chat_history_caching_integration(self, integrated_chat_system): | |
| """Test integration between Redis cache and database persistence.""" | |
| system = integrated_chat_system | |
| user_id = "test-user-caching" | |
| # Create session | |
| session = system['session_manager'].create_session(user_id, language="python") | |
| session_id = session['session_id'] | |
| # Send messages to populate cache and database | |
| for i in range(15): # More than typical cache limit | |
| message = f"Test message number {i + 1}" | |
| system['chat_agent'].process_message( | |
| session_id=session_id, | |
| message=message, | |
| language="python" | |
| ) | |
| # Test cache behavior | |
| # Recent messages should be fast (from cache) | |
| start_time = time.time() | |
| recent_from_cache = system['chat_history_manager'].get_recent_history(session_id, limit=10) | |
| cache_time = time.time() - start_time | |
| # Full history might be slower (from database) | |
| start_time = time.time() | |
| full_from_db = system['chat_history_manager'].get_full_history(session_id) | |
| db_time = time.time() - start_time | |
| # Verify data consistency | |
| assert len(recent_from_cache) == 10 | |
| assert len(full_from_db) == 30 # 15 user messages + 15 responses | |
| # Recent history should match the last 10 messages from full history | |
| assert recent_from_cache == full_from_db[-10:] | |
| # Cache should generally be faster (though this might not always be true in tests) | |
| print(f"Cache retrieval time: {cache_time:.4f}s") | |
| print(f"Database retrieval time: {db_time:.4f}s") | |
| def test_message_metadata_persistence(self, integrated_chat_system): | |
| """Test that message metadata is properly stored and retrieved.""" | |
| system = integrated_chat_system | |
| user_id = "test-user-metadata" | |
| # Create session | |
| session = system['session_manager'].create_session(user_id, language="python") | |
| session_id = session['session_id'] | |
| # Store message with metadata | |
| test_metadata = { | |
| "source": "web_interface", | |
| "user_agent": "test-browser", | |
| "timestamp_client": "2023-01-01T12:00:00Z" | |
| } | |
| message_id = system['chat_history_manager'].store_message( | |
| session_id=session_id, | |
| role='user', | |
| content="Test message with metadata", | |
| language="python", | |
| metadata=test_metadata | |
| ) | |
| # Store assistant response with different metadata | |
| response_metadata = { | |
| "model": "mixtral-8x7b-32768", | |
| "tokens_used": 150, | |
| "response_time": 0.85 | |
| } | |
| response_id = system['chat_history_manager'].store_message( | |
| session_id=session_id, | |
| role='assistant', | |
| content="Test response with metadata", | |
| language="python", | |
| metadata=response_metadata | |
| ) | |
| # Retrieve and verify metadata | |
| history = system['chat_history_manager'].get_recent_history(session_id, limit=2) | |
| user_message = next(msg for msg in history if msg['role'] == 'user') | |
| assistant_message = next(msg for msg in history if msg['role'] == 'assistant') | |
| # Verify user message metadata | |
| assert user_message['metadata']['source'] == "web_interface" | |
| assert user_message['metadata']['user_agent'] == "test-browser" | |
| # Verify assistant message metadata | |
| assert assistant_message['metadata']['model'] == "mixtral-8x7b-32768" | |
| assert assistant_message['metadata']['tokens_used'] == 150 | |
| def test_chat_history_search_integration(self, integrated_chat_system): | |
| """Test chat history search functionality.""" | |
| system = integrated_chat_system | |
| user_id = "test-user-search" | |
| # Create session | |
| session = system['session_manager'].create_session(user_id, language="python") | |
| session_id = session['session_id'] | |
| # Send messages with searchable content | |
| searchable_messages = [ | |
| "How do I create a Python list?", | |
| "What is a dictionary in Python?", | |
| "Can you explain Python functions?", | |
| "How to use loops in JavaScript?", # Different language | |
| "What are Python classes?" | |
| ] | |
| for message in searchable_messages: | |
| language = "javascript" if "JavaScript" in message else "python" | |
| system['chat_agent'].process_message( | |
| session_id=session_id, | |
| message=message, | |
| language=language | |
| ) | |
| # Test search functionality (if implemented) | |
| if hasattr(system['chat_history_manager'], 'search_messages'): | |
| # Search for Python-related messages | |
| python_results = system['chat_history_manager'].search_messages( | |
| session_id=session_id, | |
| query="Python", | |
| limit=10 | |
| ) | |
| # Should find 4 Python messages (excluding JavaScript one) | |
| python_user_messages = [msg for msg in python_results if msg['role'] == 'user'] | |
| assert len(python_user_messages) == 4 | |
| # Search for specific terms | |
| list_results = system['chat_history_manager'].search_messages( | |
| session_id=session_id, | |
| query="list", | |
| limit=10 | |
| ) | |
| list_user_messages = [msg for msg in list_results if msg['role'] == 'user'] | |
| assert len(list_user_messages) >= 1 | |
| assert any("list" in msg['content'].lower() for msg in list_user_messages) | |
| def test_session_history_isolation(self, integrated_chat_system): | |
| """Test that chat history is properly isolated between sessions.""" | |
| system = integrated_chat_system | |
| # Create multiple sessions for different users | |
| user_sessions = [] | |
| for i in range(3): | |
| user_id = f"test-user-isolation-{i}" | |
| session = system['session_manager'].create_session(user_id, language="python") | |
| user_sessions.append((user_id, session)) | |
| # Send different messages to each session | |
| session_messages = {} | |
| for i, (user_id, session) in enumerate(user_sessions): | |
| session_id = session['session_id'] | |
| messages = [ | |
| f"User {i} message 1: What is Python?", | |
| f"User {i} message 2: How do I code?", | |
| f"User {i} message 3: Explain variables" | |
| ] | |
| session_messages[session_id] = messages | |
| for message in messages: | |
| system['chat_agent'].process_message( | |
| session_id=session_id, | |
| message=message, | |
| language="python" | |
| ) | |
| # Verify history isolation | |
| for user_id, session in user_sessions: | |
| session_id = session['session_id'] | |
| history = system['chat_history_manager'].get_full_history(session_id) | |
| # Should have 6 messages (3 user + 3 assistant) | |
| assert len(history) == 6 | |
| # Verify only this session's messages are present | |
| user_messages = [msg for msg in history if msg['role'] == 'user'] | |
| expected_messages = session_messages[session_id] | |
| for i, user_message in enumerate(user_messages): | |
| assert user_message['content'] == expected_messages[i] | |
| assert f"User {user_sessions.index((user_id, session))}" in user_message['content'] | |
| def test_chat_history_cleanup_integration(self, integrated_chat_system): | |
| """Test chat history cleanup and retention policies.""" | |
| system = integrated_chat_system | |
| user_id = "test-user-cleanup" | |
| # Create session | |
| session = system['session_manager'].create_session(user_id, language="python") | |
| session_id = session['session_id'] | |
| # Send many messages | |
| for i in range(50): | |
| message = f"Test message {i + 1} for cleanup testing" | |
| system['chat_agent'].process_message( | |
| session_id=session_id, | |
| message=message, | |
| language="python" | |
| ) | |
| # Verify all messages are stored | |
| full_history = system['chat_history_manager'].get_full_history(session_id) | |
| assert len(full_history) == 100 # 50 user + 50 assistant messages | |
| # Test cleanup functionality (if implemented) | |
| if hasattr(system['chat_history_manager'], 'cleanup_old_messages'): | |
| # Clean up messages older than a certain threshold | |
| cleanup_result = system['chat_history_manager'].cleanup_old_messages( | |
| session_id=session_id, | |
| keep_recent=20 # Keep only last 20 messages | |
| ) | |
| # Verify cleanup | |
| remaining_history = system['chat_history_manager'].get_full_history(session_id) | |
| assert len(remaining_history) <= 20 | |
| # Verify most recent messages are kept | |
| recent_history = system['chat_history_manager'].get_recent_history(session_id, limit=10) | |
| assert len(recent_history) == 10 | |
| def test_concurrent_history_operations(self, integrated_chat_system): | |
| """Test concurrent chat history operations.""" | |
| system = integrated_chat_system | |
| user_id = "test-user-concurrent" | |
| # Create session | |
| session = system['session_manager'].create_session(user_id, language="python") | |
| session_id = session['session_id'] | |
| import threading | |
| import concurrent.futures | |
| def send_messages(thread_id, num_messages): | |
| """Send messages from a specific thread.""" | |
| results = [] | |
| for i in range(num_messages): | |
| message = f"Thread {thread_id} message {i + 1}" | |
| try: | |
| response = system['chat_agent'].process_message( | |
| session_id=session_id, | |
| message=message, | |
| language="python" | |
| ) | |
| results.append((message, response)) | |
| except Exception as e: | |
| results.append((message, f"Error: {e}")) | |
| return results | |
| # Run concurrent message sending | |
| num_threads = 5 | |
| messages_per_thread = 3 | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: | |
| futures = [ | |
| executor.submit(send_messages, thread_id, messages_per_thread) | |
| for thread_id in range(num_threads) | |
| ] | |
| thread_results = [future.result() for future in concurrent.futures.as_completed(futures)] | |
| # Verify all messages were processed | |
| total_expected_messages = num_threads * messages_per_thread * 2 # user + assistant | |
| # Allow some time for all operations to complete | |
| time.sleep(0.1) | |
| final_history = system['chat_history_manager'].get_full_history(session_id) | |
| # Should have all messages (might be slightly less due to race conditions) | |
| assert len(final_history) >= total_expected_messages * 0.8 # Allow 20% tolerance | |
| # Verify no data corruption | |
| for message in final_history: | |
| assert 'content' in message | |
| assert 'role' in message | |
| assert 'timestamp' in message | |
| assert message['role'] in ['user', 'assistant'] | |
| def test_history_pagination_integration(self, integrated_chat_system): | |
| """Test chat history pagination functionality.""" | |
| system = integrated_chat_system | |
| user_id = "test-user-pagination" | |
| # Create session | |
| session = system['session_manager'].create_session(user_id, language="python") | |
| session_id = session['session_id'] | |
| # Send 25 messages (50 total with responses) | |
| for i in range(25): | |
| message = f"Pagination test message {i + 1}" | |
| system['chat_agent'].process_message( | |
| session_id=session_id, | |
| message=message, | |
| language="python" | |
| ) | |
| # Test pagination (if implemented) | |
| if hasattr(system['chat_history_manager'], 'get_history_page'): | |
| # Get first page | |
| page_1 = system['chat_history_manager'].get_history_page( | |
| session_id=session_id, | |
| page=1, | |
| page_size=10 | |
| ) | |
| assert len(page_1['messages']) == 10 | |
| assert page_1['page'] == 1 | |
| assert page_1['total_pages'] == 5 # 50 messages / 10 per page | |
| # Get second page | |
| page_2 = system['chat_history_manager'].get_history_page( | |
| session_id=session_id, | |
| page=2, | |
| page_size=10 | |
| ) | |
| assert len(page_2['messages']) == 10 | |
| assert page_2['page'] == 2 | |
| # Verify no overlap between pages | |
| page_1_ids = {msg['id'] for msg in page_1['messages']} | |
| page_2_ids = {msg['id'] for msg in page_2['messages']} | |
| assert len(page_1_ids.intersection(page_2_ids)) == 0 | |
| if __name__ == '__main__': | |
| pytest.main([__file__, '-v']) |