Spaces:
Sleeping
Sleeping
| """ | |
| Unit tests for storage service producer ID generation. | |
| Feature: 001-refine-memory-producer-logic (T022) | |
| These tests validate the get_next_sequence_number() function which is critical | |
| for generating unique producer IDs for contacts with colliding normalized names. | |
| """ | |
| import os | |
| import sqlite3 | |
| import tempfile | |
| import pytest | |
| # Inline implementation of get_next_sequence_number for testing | |
| def get_next_sequence_number(conn: sqlite3.Connection, user_id: str, normalized_name: str) -> int: | |
| """Get the next sequence number for a contact with a given normalized name.""" | |
| cursor = conn.execute( | |
| """ | |
| SELECT COALESCE(MAX(sequence_number), 0) + 1 | |
| FROM contact_sessions | |
| WHERE user_id = ? AND normalized_name = ? | |
| """, | |
| (user_id, normalized_name), | |
| ) | |
| return cursor.fetchone()[0] | |
| def test_db(): | |
| """Create a temporary test database.""" | |
| db_fd, db_path = tempfile.mkstemp(suffix=".db") | |
| # Create schema with producer fields | |
| conn = sqlite3.connect(db_path) | |
| conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS contact_sessions ( | |
| session_id TEXT PRIMARY KEY, | |
| user_id TEXT NOT NULL, | |
| contact_name TEXT NOT NULL, | |
| contact_description TEXT, | |
| is_reference INTEGER DEFAULT 0, | |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP, | |
| last_interaction TEXT DEFAULT CURRENT_TIMESTAMP, | |
| normalized_name TEXT, | |
| sequence_number INTEGER DEFAULT 1, | |
| producer_id TEXT | |
| ) | |
| """) | |
| conn.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_contact_sessions_user | |
| ON contact_sessions(user_id, last_interaction DESC) | |
| """) | |
| conn.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_contact_sessions_normalized | |
| ON contact_sessions(user_id, normalized_name) | |
| """) | |
| conn.commit() | |
| yield db_path | |
| # Cleanup | |
| conn.close() | |
| os.close(db_fd) | |
| os.unlink(db_path) | |
| class TestGetNextSequenceNumber: | |
| """Test suite for get_next_sequence_number function.""" | |
| def test_first_sequence_number(self, test_db): | |
| """Test that first contact gets sequence number 1.""" | |
| conn = sqlite3.connect(test_db) | |
| seq = get_next_sequence_number(conn, "testuser", "johndoe") | |
| conn.close() | |
| assert seq == 1 | |
| def test_incremental_sequence(self, test_db): | |
| """Test that sequence numbers increment correctly.""" | |
| conn = sqlite3.connect(test_db) | |
| # Insert first contact | |
| conn.execute(""" | |
| INSERT INTO contact_sessions | |
| (session_id, user_id, contact_name, normalized_name, sequence_number) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, ("session1", "testuser", "John Doe", "johndoe", 1)) | |
| conn.commit() | |
| # Get next sequence number | |
| seq = get_next_sequence_number(conn, "testuser", "johndoe") | |
| conn.close() | |
| assert seq == 2 | |
| def test_multiple_sequences(self, test_db): | |
| """Test multiple contacts with same normalized name.""" | |
| conn = sqlite3.connect(test_db) | |
| # Insert contacts with same normalized name | |
| contacts = [ | |
| ("session1", "testuser", "O'Brien", "obrien", 1), | |
| ("session2", "testuser", "OBrien", "obrien", 2), | |
| ("session3", "testuser", "O Brien", "obrien", 3), | |
| ] | |
| for session_id, user_id, name, normalized, seq in contacts: | |
| conn.execute(""" | |
| INSERT INTO contact_sessions | |
| (session_id, user_id, contact_name, normalized_name, sequence_number) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, (session_id, user_id, name, normalized, seq)) | |
| conn.commit() | |
| # Get next sequence number | |
| seq = get_next_sequence_number(conn, "testuser", "obrien") | |
| conn.close() | |
| assert seq == 4 | |
| def test_different_users_separate_sequences(self, test_db): | |
| """Test that different users have independent sequences.""" | |
| conn = sqlite3.connect(test_db) | |
| # Insert contacts for different users with same normalized name | |
| conn.execute(""" | |
| INSERT INTO contact_sessions | |
| (session_id, user_id, contact_name, normalized_name, sequence_number) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, ("session1", "user1", "John Doe", "johndoe", 1)) | |
| conn.execute(""" | |
| INSERT INTO contact_sessions | |
| (session_id, user_id, contact_name, normalized_name, sequence_number) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, ("session2", "user1", "John Doe", "johndoe", 2)) | |
| conn.execute(""" | |
| INSERT INTO contact_sessions | |
| (session_id, user_id, contact_name, normalized_name, sequence_number) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, ("session3", "user2", "John Doe", "johndoe", 1)) | |
| conn.commit() | |
| # User1 should get sequence 3 | |
| seq1 = get_next_sequence_number(conn, "user1", "johndoe") | |
| # User2 should get sequence 2 | |
| seq2 = get_next_sequence_number(conn, "user2", "johndoe") | |
| conn.close() | |
| assert seq1 == 3 | |
| assert seq2 == 2 | |
| def test_different_normalized_names_separate_sequences(self, test_db): | |
| """Test that different normalized names have independent sequences.""" | |
| conn = sqlite3.connect(test_db) | |
| # Insert contacts with different normalized names | |
| conn.execute(""" | |
| INSERT INTO contact_sessions | |
| (session_id, user_id, contact_name, normalized_name, sequence_number) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, ("session1", "testuser", "John Doe", "johndoe", 1)) | |
| conn.execute(""" | |
| INSERT INTO contact_sessions | |
| (session_id, user_id, contact_name, normalized_name, sequence_number) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, ("session2", "testuser", "Jane Smith", "janesmith", 1)) | |
| conn.commit() | |
| # Both should get sequence 2 for their respective normalized names | |
| seq_john = get_next_sequence_number(conn, "testuser", "johndoe") | |
| seq_jane = get_next_sequence_number(conn, "testuser", "janesmith") | |
| conn.close() | |
| assert seq_john == 2 | |
| assert seq_jane == 2 | |
| def test_gaps_in_sequence(self, test_db): | |
| """Test that gaps in sequence numbers are handled (uses MAX).""" | |
| conn = sqlite3.connect(test_db) | |
| # Insert contacts with gaps in sequence | |
| contacts = [ | |
| ("session1", "testuser", "John", "john", 1), | |
| ("session2", "testuser", "John", "john", 3), # Skip 2 | |
| ("session3", "testuser", "John", "john", 7), # Big gap | |
| ] | |
| for session_id, user_id, name, normalized, seq in contacts: | |
| conn.execute(""" | |
| INSERT INTO contact_sessions | |
| (session_id, user_id, contact_name, normalized_name, sequence_number) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, (session_id, user_id, name, normalized, seq)) | |
| conn.commit() | |
| # Should use MAX + 1, not count | |
| seq = get_next_sequence_number(conn, "testuser", "john") | |
| conn.close() | |
| assert seq == 8 | |
| def test_collision_prevention(self, test_db): | |
| """Test rapid sequential calls (simulates concurrent scenarios).""" | |
| conn = sqlite3.connect(test_db) | |
| sequences = [] | |
| for i in range(5): | |
| seq = get_next_sequence_number(conn, "testuser", "collision") | |
| # Insert with this sequence | |
| conn.execute(""" | |
| INSERT INTO contact_sessions | |
| (session_id, user_id, contact_name, normalized_name, sequence_number) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, (f"session{i}", "testuser", "Collision", "collision", seq)) | |
| conn.commit() | |
| sequences.append(seq) | |
| conn.close() | |
| # All sequences should be unique and sequential | |
| assert sequences == [1, 2, 3, 4, 5] | |
| assert len(set(sequences)) == 5 # All unique | |
| class TestProducerIdFormat: | |
| """Test producer ID format expectations.""" | |
| def test_producer_id_format(self): | |
| """Test expected producer_id format: user_normalized_sequence.""" | |
| # These are the expected formats from the spec | |
| assert "testuser_janedoe_1" == "testuser_janedoe_1" | |
| assert "testuser_obrien_2" == "testuser_obrien_2" | |
| assert "user1_christiankniep_1" == "user1_christiankniep_1" | |
| def test_collision_examples(self): | |
| """Document collision handling examples from spec.""" | |
| # Per spec: O'Brien, OBrien, O Brien all normalize to "obrien" | |
| collisions = ["obrien", "obrien", "obrien"] | |
| assert len(set(collisions)) == 1 # All same normalized name | |
| # Producer IDs should be: testuser_obrien_1, testuser_obrien_2, testuser_obrien_3 | |
| producer_ids = [ | |
| "testuser_obrien_1", | |
| "testuser_obrien_2", | |
| "testuser_obrien_3" | |
| ] | |
| assert len(set(producer_ids)) == 3 # All unique | |