""" Integration tests for chat history persistence. Tests the complete integration between chat history storage, retrieval, and caching. """ import pytest import time import json from unittest.mock import patch, MagicMock from chat_agent.services.chat_history import ChatHistoryManager from chat_agent.services.session_manager import SessionManager from chat_agent.services.chat_agent import ChatAgent class TestChatHistoryPersistence: """Integration tests for chat history persistence functionality.""" @pytest.fixture def session_manager(self): """Create session manager for testing.""" return SessionManager() @pytest.fixture def chat_history_manager(self): """Create chat history manager for testing.""" return ChatHistoryManager() @pytest.fixture def mock_groq_client(self): """Mock Groq client for testing.""" with patch('chat_agent.services.groq_client.GroqClient') as mock: mock_instance = MagicMock() mock_instance.generate_response.return_value = "Test response from LLM" mock.return_value = mock_instance yield mock_instance @pytest.fixture def integrated_chat_system(self, mock_groq_client, session_manager, chat_history_manager): """Create integrated chat system for testing.""" from chat_agent.services.language_context import LanguageContextManager language_context_manager = LanguageContextManager() chat_agent = ChatAgent( groq_client=mock_groq_client, session_manager=session_manager, language_context_manager=language_context_manager, chat_history_manager=chat_history_manager ) return { 'chat_agent': chat_agent, 'session_manager': session_manager, 'chat_history_manager': chat_history_manager, 'language_context_manager': language_context_manager } def test_message_storage_and_retrieval_integration(self, integrated_chat_system): """Test complete message storage and retrieval workflow.""" system = integrated_chat_system user_id = "test-user-storage" # Create session session = system['session_manager'].create_session(user_id, language="python") session_id = session['session_id'] # Send multiple messages messages = [ "What is Python?", "How do I create a list?", "Can you explain functions?", "What are loops?", "How to handle exceptions?" ] for message in messages: response = system['chat_agent'].process_message( session_id=session_id, message=message, language="python" ) assert response is not None # Test different retrieval methods # 1. Recent history recent_history = system['chat_history_manager'].get_recent_history(session_id, limit=6) assert len(recent_history) == 6 # Last 3 conversations (user + assistant) # Verify order (most recent first) assert recent_history[0]['content'] == messages[-3] # 3rd to last user message assert recent_history[1]['role'] == 'assistant' assert recent_history[2]['content'] == messages[-2] # 2nd to last user message assert recent_history[3]['role'] == 'assistant' assert recent_history[4]['content'] == messages[-1] # Last user message assert recent_history[5]['role'] == 'assistant' # 2. Full history full_history = system['chat_history_manager'].get_full_history(session_id) assert len(full_history) == 10 # 5 user messages + 5 responses # Verify chronological order for i, message in enumerate(messages): assert full_history[i * 2]['content'] == message assert full_history[i * 2]['role'] == 'user' assert full_history[i * 2 + 1]['role'] == 'assistant' def test_chat_history_caching_integration(self, integrated_chat_system): """Test integration between Redis cache and database persistence.""" system = integrated_chat_system user_id = "test-user-caching" # Create session session = system['session_manager'].create_session(user_id, language="python") session_id = session['session_id'] # Send messages to populate cache and database for i in range(15): # More than typical cache limit message = f"Test message number {i + 1}" system['chat_agent'].process_message( session_id=session_id, message=message, language="python" ) # Test cache behavior # Recent messages should be fast (from cache) start_time = time.time() recent_from_cache = system['chat_history_manager'].get_recent_history(session_id, limit=10) cache_time = time.time() - start_time # Full history might be slower (from database) start_time = time.time() full_from_db = system['chat_history_manager'].get_full_history(session_id) db_time = time.time() - start_time # Verify data consistency assert len(recent_from_cache) == 10 assert len(full_from_db) == 30 # 15 user messages + 15 responses # Recent history should match the last 10 messages from full history assert recent_from_cache == full_from_db[-10:] # Cache should generally be faster (though this might not always be true in tests) print(f"Cache retrieval time: {cache_time:.4f}s") print(f"Database retrieval time: {db_time:.4f}s") def test_message_metadata_persistence(self, integrated_chat_system): """Test that message metadata is properly stored and retrieved.""" system = integrated_chat_system user_id = "test-user-metadata" # Create session session = system['session_manager'].create_session(user_id, language="python") session_id = session['session_id'] # Store message with metadata test_metadata = { "source": "web_interface", "user_agent": "test-browser", "timestamp_client": "2023-01-01T12:00:00Z" } message_id = system['chat_history_manager'].store_message( session_id=session_id, role='user', content="Test message with metadata", language="python", metadata=test_metadata ) # Store assistant response with different metadata response_metadata = { "model": "mixtral-8x7b-32768", "tokens_used": 150, "response_time": 0.85 } response_id = system['chat_history_manager'].store_message( session_id=session_id, role='assistant', content="Test response with metadata", language="python", metadata=response_metadata ) # Retrieve and verify metadata history = system['chat_history_manager'].get_recent_history(session_id, limit=2) user_message = next(msg for msg in history if msg['role'] == 'user') assistant_message = next(msg for msg in history if msg['role'] == 'assistant') # Verify user message metadata assert user_message['metadata']['source'] == "web_interface" assert user_message['metadata']['user_agent'] == "test-browser" # Verify assistant message metadata assert assistant_message['metadata']['model'] == "mixtral-8x7b-32768" assert assistant_message['metadata']['tokens_used'] == 150 def test_chat_history_search_integration(self, integrated_chat_system): """Test chat history search functionality.""" system = integrated_chat_system user_id = "test-user-search" # Create session session = system['session_manager'].create_session(user_id, language="python") session_id = session['session_id'] # Send messages with searchable content searchable_messages = [ "How do I create a Python list?", "What is a dictionary in Python?", "Can you explain Python functions?", "How to use loops in JavaScript?", # Different language "What are Python classes?" ] for message in searchable_messages: language = "javascript" if "JavaScript" in message else "python" system['chat_agent'].process_message( session_id=session_id, message=message, language=language ) # Test search functionality (if implemented) if hasattr(system['chat_history_manager'], 'search_messages'): # Search for Python-related messages python_results = system['chat_history_manager'].search_messages( session_id=session_id, query="Python", limit=10 ) # Should find 4 Python messages (excluding JavaScript one) python_user_messages = [msg for msg in python_results if msg['role'] == 'user'] assert len(python_user_messages) == 4 # Search for specific terms list_results = system['chat_history_manager'].search_messages( session_id=session_id, query="list", limit=10 ) list_user_messages = [msg for msg in list_results if msg['role'] == 'user'] assert len(list_user_messages) >= 1 assert any("list" in msg['content'].lower() for msg in list_user_messages) def test_session_history_isolation(self, integrated_chat_system): """Test that chat history is properly isolated between sessions.""" system = integrated_chat_system # Create multiple sessions for different users user_sessions = [] for i in range(3): user_id = f"test-user-isolation-{i}" session = system['session_manager'].create_session(user_id, language="python") user_sessions.append((user_id, session)) # Send different messages to each session session_messages = {} for i, (user_id, session) in enumerate(user_sessions): session_id = session['session_id'] messages = [ f"User {i} message 1: What is Python?", f"User {i} message 2: How do I code?", f"User {i} message 3: Explain variables" ] session_messages[session_id] = messages for message in messages: system['chat_agent'].process_message( session_id=session_id, message=message, language="python" ) # Verify history isolation for user_id, session in user_sessions: session_id = session['session_id'] history = system['chat_history_manager'].get_full_history(session_id) # Should have 6 messages (3 user + 3 assistant) assert len(history) == 6 # Verify only this session's messages are present user_messages = [msg for msg in history if msg['role'] == 'user'] expected_messages = session_messages[session_id] for i, user_message in enumerate(user_messages): assert user_message['content'] == expected_messages[i] assert f"User {user_sessions.index((user_id, session))}" in user_message['content'] def test_chat_history_cleanup_integration(self, integrated_chat_system): """Test chat history cleanup and retention policies.""" system = integrated_chat_system user_id = "test-user-cleanup" # Create session session = system['session_manager'].create_session(user_id, language="python") session_id = session['session_id'] # Send many messages for i in range(50): message = f"Test message {i + 1} for cleanup testing" system['chat_agent'].process_message( session_id=session_id, message=message, language="python" ) # Verify all messages are stored full_history = system['chat_history_manager'].get_full_history(session_id) assert len(full_history) == 100 # 50 user + 50 assistant messages # Test cleanup functionality (if implemented) if hasattr(system['chat_history_manager'], 'cleanup_old_messages'): # Clean up messages older than a certain threshold cleanup_result = system['chat_history_manager'].cleanup_old_messages( session_id=session_id, keep_recent=20 # Keep only last 20 messages ) # Verify cleanup remaining_history = system['chat_history_manager'].get_full_history(session_id) assert len(remaining_history) <= 20 # Verify most recent messages are kept recent_history = system['chat_history_manager'].get_recent_history(session_id, limit=10) assert len(recent_history) == 10 def test_concurrent_history_operations(self, integrated_chat_system): """Test concurrent chat history operations.""" system = integrated_chat_system user_id = "test-user-concurrent" # Create session session = system['session_manager'].create_session(user_id, language="python") session_id = session['session_id'] import threading import concurrent.futures def send_messages(thread_id, num_messages): """Send messages from a specific thread.""" results = [] for i in range(num_messages): message = f"Thread {thread_id} message {i + 1}" try: response = system['chat_agent'].process_message( session_id=session_id, message=message, language="python" ) results.append((message, response)) except Exception as e: results.append((message, f"Error: {e}")) return results # Run concurrent message sending num_threads = 5 messages_per_thread = 3 with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [ executor.submit(send_messages, thread_id, messages_per_thread) for thread_id in range(num_threads) ] thread_results = [future.result() for future in concurrent.futures.as_completed(futures)] # Verify all messages were processed total_expected_messages = num_threads * messages_per_thread * 2 # user + assistant # Allow some time for all operations to complete time.sleep(0.1) final_history = system['chat_history_manager'].get_full_history(session_id) # Should have all messages (might be slightly less due to race conditions) assert len(final_history) >= total_expected_messages * 0.8 # Allow 20% tolerance # Verify no data corruption for message in final_history: assert 'content' in message assert 'role' in message assert 'timestamp' in message assert message['role'] in ['user', 'assistant'] def test_history_pagination_integration(self, integrated_chat_system): """Test chat history pagination functionality.""" system = integrated_chat_system user_id = "test-user-pagination" # Create session session = system['session_manager'].create_session(user_id, language="python") session_id = session['session_id'] # Send 25 messages (50 total with responses) for i in range(25): message = f"Pagination test message {i + 1}" system['chat_agent'].process_message( session_id=session_id, message=message, language="python" ) # Test pagination (if implemented) if hasattr(system['chat_history_manager'], 'get_history_page'): # Get first page page_1 = system['chat_history_manager'].get_history_page( session_id=session_id, page=1, page_size=10 ) assert len(page_1['messages']) == 10 assert page_1['page'] == 1 assert page_1['total_pages'] == 5 # 50 messages / 10 per page # Get second page page_2 = system['chat_history_manager'].get_history_page( session_id=session_id, page=2, page_size=10 ) assert len(page_2['messages']) == 10 assert page_2['page'] == 2 # Verify no overlap between pages page_1_ids = {msg['id'] for msg in page_1['messages']} page_2_ids = {msg['id'] for msg in page_2['messages']} assert len(page_1_ids.intersection(page_2_ids)) == 0 if __name__ == '__main__': pytest.main([__file__, '-v'])