scratch_chat / tests /integration /test_chat_history_persistence.py
WebashalarForML's picture
Upload 178 files
330b6e4 verified
"""
Integration tests for chat history persistence.
Tests the complete integration between chat history storage, retrieval, and caching.
"""
import pytest
import time
import json
from unittest.mock import patch, MagicMock
from chat_agent.services.chat_history import ChatHistoryManager
from chat_agent.services.session_manager import SessionManager
from chat_agent.services.chat_agent import ChatAgent
class TestChatHistoryPersistence:
"""Integration tests for chat history persistence functionality."""
@pytest.fixture
def session_manager(self):
"""Create session manager for testing."""
return SessionManager()
@pytest.fixture
def chat_history_manager(self):
"""Create chat history manager for testing."""
return ChatHistoryManager()
@pytest.fixture
def mock_groq_client(self):
"""Mock Groq client for testing."""
with patch('chat_agent.services.groq_client.GroqClient') as mock:
mock_instance = MagicMock()
mock_instance.generate_response.return_value = "Test response from LLM"
mock.return_value = mock_instance
yield mock_instance
@pytest.fixture
def integrated_chat_system(self, mock_groq_client, session_manager, chat_history_manager):
"""Create integrated chat system for testing."""
from chat_agent.services.language_context import LanguageContextManager
language_context_manager = LanguageContextManager()
chat_agent = ChatAgent(
groq_client=mock_groq_client,
session_manager=session_manager,
language_context_manager=language_context_manager,
chat_history_manager=chat_history_manager
)
return {
'chat_agent': chat_agent,
'session_manager': session_manager,
'chat_history_manager': chat_history_manager,
'language_context_manager': language_context_manager
}
def test_message_storage_and_retrieval_integration(self, integrated_chat_system):
"""Test complete message storage and retrieval workflow."""
system = integrated_chat_system
user_id = "test-user-storage"
# Create session
session = system['session_manager'].create_session(user_id, language="python")
session_id = session['session_id']
# Send multiple messages
messages = [
"What is Python?",
"How do I create a list?",
"Can you explain functions?",
"What are loops?",
"How to handle exceptions?"
]
for message in messages:
response = system['chat_agent'].process_message(
session_id=session_id,
message=message,
language="python"
)
assert response is not None
# Test different retrieval methods
# 1. Recent history
recent_history = system['chat_history_manager'].get_recent_history(session_id, limit=6)
assert len(recent_history) == 6 # Last 3 conversations (user + assistant)
# Verify order (most recent first)
assert recent_history[0]['content'] == messages[-3] # 3rd to last user message
assert recent_history[1]['role'] == 'assistant'
assert recent_history[2]['content'] == messages[-2] # 2nd to last user message
assert recent_history[3]['role'] == 'assistant'
assert recent_history[4]['content'] == messages[-1] # Last user message
assert recent_history[5]['role'] == 'assistant'
# 2. Full history
full_history = system['chat_history_manager'].get_full_history(session_id)
assert len(full_history) == 10 # 5 user messages + 5 responses
# Verify chronological order
for i, message in enumerate(messages):
assert full_history[i * 2]['content'] == message
assert full_history[i * 2]['role'] == 'user'
assert full_history[i * 2 + 1]['role'] == 'assistant'
def test_chat_history_caching_integration(self, integrated_chat_system):
"""Test integration between Redis cache and database persistence."""
system = integrated_chat_system
user_id = "test-user-caching"
# Create session
session = system['session_manager'].create_session(user_id, language="python")
session_id = session['session_id']
# Send messages to populate cache and database
for i in range(15): # More than typical cache limit
message = f"Test message number {i + 1}"
system['chat_agent'].process_message(
session_id=session_id,
message=message,
language="python"
)
# Test cache behavior
# Recent messages should be fast (from cache)
start_time = time.time()
recent_from_cache = system['chat_history_manager'].get_recent_history(session_id, limit=10)
cache_time = time.time() - start_time
# Full history might be slower (from database)
start_time = time.time()
full_from_db = system['chat_history_manager'].get_full_history(session_id)
db_time = time.time() - start_time
# Verify data consistency
assert len(recent_from_cache) == 10
assert len(full_from_db) == 30 # 15 user messages + 15 responses
# Recent history should match the last 10 messages from full history
assert recent_from_cache == full_from_db[-10:]
# Cache should generally be faster (though this might not always be true in tests)
print(f"Cache retrieval time: {cache_time:.4f}s")
print(f"Database retrieval time: {db_time:.4f}s")
def test_message_metadata_persistence(self, integrated_chat_system):
"""Test that message metadata is properly stored and retrieved."""
system = integrated_chat_system
user_id = "test-user-metadata"
# Create session
session = system['session_manager'].create_session(user_id, language="python")
session_id = session['session_id']
# Store message with metadata
test_metadata = {
"source": "web_interface",
"user_agent": "test-browser",
"timestamp_client": "2023-01-01T12:00:00Z"
}
message_id = system['chat_history_manager'].store_message(
session_id=session_id,
role='user',
content="Test message with metadata",
language="python",
metadata=test_metadata
)
# Store assistant response with different metadata
response_metadata = {
"model": "mixtral-8x7b-32768",
"tokens_used": 150,
"response_time": 0.85
}
response_id = system['chat_history_manager'].store_message(
session_id=session_id,
role='assistant',
content="Test response with metadata",
language="python",
metadata=response_metadata
)
# Retrieve and verify metadata
history = system['chat_history_manager'].get_recent_history(session_id, limit=2)
user_message = next(msg for msg in history if msg['role'] == 'user')
assistant_message = next(msg for msg in history if msg['role'] == 'assistant')
# Verify user message metadata
assert user_message['metadata']['source'] == "web_interface"
assert user_message['metadata']['user_agent'] == "test-browser"
# Verify assistant message metadata
assert assistant_message['metadata']['model'] == "mixtral-8x7b-32768"
assert assistant_message['metadata']['tokens_used'] == 150
def test_chat_history_search_integration(self, integrated_chat_system):
"""Test chat history search functionality."""
system = integrated_chat_system
user_id = "test-user-search"
# Create session
session = system['session_manager'].create_session(user_id, language="python")
session_id = session['session_id']
# Send messages with searchable content
searchable_messages = [
"How do I create a Python list?",
"What is a dictionary in Python?",
"Can you explain Python functions?",
"How to use loops in JavaScript?", # Different language
"What are Python classes?"
]
for message in searchable_messages:
language = "javascript" if "JavaScript" in message else "python"
system['chat_agent'].process_message(
session_id=session_id,
message=message,
language=language
)
# Test search functionality (if implemented)
if hasattr(system['chat_history_manager'], 'search_messages'):
# Search for Python-related messages
python_results = system['chat_history_manager'].search_messages(
session_id=session_id,
query="Python",
limit=10
)
# Should find 4 Python messages (excluding JavaScript one)
python_user_messages = [msg for msg in python_results if msg['role'] == 'user']
assert len(python_user_messages) == 4
# Search for specific terms
list_results = system['chat_history_manager'].search_messages(
session_id=session_id,
query="list",
limit=10
)
list_user_messages = [msg for msg in list_results if msg['role'] == 'user']
assert len(list_user_messages) >= 1
assert any("list" in msg['content'].lower() for msg in list_user_messages)
def test_session_history_isolation(self, integrated_chat_system):
"""Test that chat history is properly isolated between sessions."""
system = integrated_chat_system
# Create multiple sessions for different users
user_sessions = []
for i in range(3):
user_id = f"test-user-isolation-{i}"
session = system['session_manager'].create_session(user_id, language="python")
user_sessions.append((user_id, session))
# Send different messages to each session
session_messages = {}
for i, (user_id, session) in enumerate(user_sessions):
session_id = session['session_id']
messages = [
f"User {i} message 1: What is Python?",
f"User {i} message 2: How do I code?",
f"User {i} message 3: Explain variables"
]
session_messages[session_id] = messages
for message in messages:
system['chat_agent'].process_message(
session_id=session_id,
message=message,
language="python"
)
# Verify history isolation
for user_id, session in user_sessions:
session_id = session['session_id']
history = system['chat_history_manager'].get_full_history(session_id)
# Should have 6 messages (3 user + 3 assistant)
assert len(history) == 6
# Verify only this session's messages are present
user_messages = [msg for msg in history if msg['role'] == 'user']
expected_messages = session_messages[session_id]
for i, user_message in enumerate(user_messages):
assert user_message['content'] == expected_messages[i]
assert f"User {user_sessions.index((user_id, session))}" in user_message['content']
def test_chat_history_cleanup_integration(self, integrated_chat_system):
"""Test chat history cleanup and retention policies."""
system = integrated_chat_system
user_id = "test-user-cleanup"
# Create session
session = system['session_manager'].create_session(user_id, language="python")
session_id = session['session_id']
# Send many messages
for i in range(50):
message = f"Test message {i + 1} for cleanup testing"
system['chat_agent'].process_message(
session_id=session_id,
message=message,
language="python"
)
# Verify all messages are stored
full_history = system['chat_history_manager'].get_full_history(session_id)
assert len(full_history) == 100 # 50 user + 50 assistant messages
# Test cleanup functionality (if implemented)
if hasattr(system['chat_history_manager'], 'cleanup_old_messages'):
# Clean up messages older than a certain threshold
cleanup_result = system['chat_history_manager'].cleanup_old_messages(
session_id=session_id,
keep_recent=20 # Keep only last 20 messages
)
# Verify cleanup
remaining_history = system['chat_history_manager'].get_full_history(session_id)
assert len(remaining_history) <= 20
# Verify most recent messages are kept
recent_history = system['chat_history_manager'].get_recent_history(session_id, limit=10)
assert len(recent_history) == 10
def test_concurrent_history_operations(self, integrated_chat_system):
"""Test concurrent chat history operations."""
system = integrated_chat_system
user_id = "test-user-concurrent"
# Create session
session = system['session_manager'].create_session(user_id, language="python")
session_id = session['session_id']
import threading
import concurrent.futures
def send_messages(thread_id, num_messages):
"""Send messages from a specific thread."""
results = []
for i in range(num_messages):
message = f"Thread {thread_id} message {i + 1}"
try:
response = system['chat_agent'].process_message(
session_id=session_id,
message=message,
language="python"
)
results.append((message, response))
except Exception as e:
results.append((message, f"Error: {e}"))
return results
# Run concurrent message sending
num_threads = 5
messages_per_thread = 3
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [
executor.submit(send_messages, thread_id, messages_per_thread)
for thread_id in range(num_threads)
]
thread_results = [future.result() for future in concurrent.futures.as_completed(futures)]
# Verify all messages were processed
total_expected_messages = num_threads * messages_per_thread * 2 # user + assistant
# Allow some time for all operations to complete
time.sleep(0.1)
final_history = system['chat_history_manager'].get_full_history(session_id)
# Should have all messages (might be slightly less due to race conditions)
assert len(final_history) >= total_expected_messages * 0.8 # Allow 20% tolerance
# Verify no data corruption
for message in final_history:
assert 'content' in message
assert 'role' in message
assert 'timestamp' in message
assert message['role'] in ['user', 'assistant']
def test_history_pagination_integration(self, integrated_chat_system):
"""Test chat history pagination functionality."""
system = integrated_chat_system
user_id = "test-user-pagination"
# Create session
session = system['session_manager'].create_session(user_id, language="python")
session_id = session['session_id']
# Send 25 messages (50 total with responses)
for i in range(25):
message = f"Pagination test message {i + 1}"
system['chat_agent'].process_message(
session_id=session_id,
message=message,
language="python"
)
# Test pagination (if implemented)
if hasattr(system['chat_history_manager'], 'get_history_page'):
# Get first page
page_1 = system['chat_history_manager'].get_history_page(
session_id=session_id,
page=1,
page_size=10
)
assert len(page_1['messages']) == 10
assert page_1['page'] == 1
assert page_1['total_pages'] == 5 # 50 messages / 10 per page
# Get second page
page_2 = system['chat_history_manager'].get_history_page(
session_id=session_id,
page=2,
page_size=10
)
assert len(page_2['messages']) == 10
assert page_2['page'] == 2
# Verify no overlap between pages
page_1_ids = {msg['id'] for msg in page_1['messages']}
page_2_ids = {msg['id'] for msg in page_2['messages']}
assert len(page_1_ids.intersection(page_2_ids)) == 0
if __name__ == '__main__':
pytest.main([__file__, '-v'])