MemPrepMate / tests /unit /test_storage_service.py
Christian Kniep
new webapp
1fff71f
"""
Unit tests for storage service producer ID generation.
Feature: 001-refine-memory-producer-logic (T022)
These tests validate the get_next_sequence_number() function which is critical
for generating unique producer IDs for contacts with colliding normalized names.
"""
import os
import sqlite3
import tempfile
import pytest
# Inline implementation of get_next_sequence_number for testing
def get_next_sequence_number(conn: sqlite3.Connection, user_id: str, normalized_name: str) -> int:
"""Get the next sequence number for a contact with a given normalized name."""
cursor = conn.execute(
"""
SELECT COALESCE(MAX(sequence_number), 0) + 1
FROM contact_sessions
WHERE user_id = ? AND normalized_name = ?
""",
(user_id, normalized_name),
)
return cursor.fetchone()[0]
@pytest.fixture
def test_db():
"""Create a temporary test database."""
db_fd, db_path = tempfile.mkstemp(suffix=".db")
# Create schema with producer fields
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS contact_sessions (
session_id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
contact_name TEXT NOT NULL,
contact_description TEXT,
is_reference INTEGER DEFAULT 0,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
last_interaction TEXT DEFAULT CURRENT_TIMESTAMP,
normalized_name TEXT,
sequence_number INTEGER DEFAULT 1,
producer_id TEXT
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_contact_sessions_user
ON contact_sessions(user_id, last_interaction DESC)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_contact_sessions_normalized
ON contact_sessions(user_id, normalized_name)
""")
conn.commit()
yield db_path
# Cleanup
conn.close()
os.close(db_fd)
os.unlink(db_path)
class TestGetNextSequenceNumber:
"""Test suite for get_next_sequence_number function."""
def test_first_sequence_number(self, test_db):
"""Test that first contact gets sequence number 1."""
conn = sqlite3.connect(test_db)
seq = get_next_sequence_number(conn, "testuser", "johndoe")
conn.close()
assert seq == 1
def test_incremental_sequence(self, test_db):
"""Test that sequence numbers increment correctly."""
conn = sqlite3.connect(test_db)
# Insert first contact
conn.execute("""
INSERT INTO contact_sessions
(session_id, user_id, contact_name, normalized_name, sequence_number)
VALUES (?, ?, ?, ?, ?)
""", ("session1", "testuser", "John Doe", "johndoe", 1))
conn.commit()
# Get next sequence number
seq = get_next_sequence_number(conn, "testuser", "johndoe")
conn.close()
assert seq == 2
def test_multiple_sequences(self, test_db):
"""Test multiple contacts with same normalized name."""
conn = sqlite3.connect(test_db)
# Insert contacts with same normalized name
contacts = [
("session1", "testuser", "O'Brien", "obrien", 1),
("session2", "testuser", "OBrien", "obrien", 2),
("session3", "testuser", "O Brien", "obrien", 3),
]
for session_id, user_id, name, normalized, seq in contacts:
conn.execute("""
INSERT INTO contact_sessions
(session_id, user_id, contact_name, normalized_name, sequence_number)
VALUES (?, ?, ?, ?, ?)
""", (session_id, user_id, name, normalized, seq))
conn.commit()
# Get next sequence number
seq = get_next_sequence_number(conn, "testuser", "obrien")
conn.close()
assert seq == 4
def test_different_users_separate_sequences(self, test_db):
"""Test that different users have independent sequences."""
conn = sqlite3.connect(test_db)
# Insert contacts for different users with same normalized name
conn.execute("""
INSERT INTO contact_sessions
(session_id, user_id, contact_name, normalized_name, sequence_number)
VALUES (?, ?, ?, ?, ?)
""", ("session1", "user1", "John Doe", "johndoe", 1))
conn.execute("""
INSERT INTO contact_sessions
(session_id, user_id, contact_name, normalized_name, sequence_number)
VALUES (?, ?, ?, ?, ?)
""", ("session2", "user1", "John Doe", "johndoe", 2))
conn.execute("""
INSERT INTO contact_sessions
(session_id, user_id, contact_name, normalized_name, sequence_number)
VALUES (?, ?, ?, ?, ?)
""", ("session3", "user2", "John Doe", "johndoe", 1))
conn.commit()
# User1 should get sequence 3
seq1 = get_next_sequence_number(conn, "user1", "johndoe")
# User2 should get sequence 2
seq2 = get_next_sequence_number(conn, "user2", "johndoe")
conn.close()
assert seq1 == 3
assert seq2 == 2
def test_different_normalized_names_separate_sequences(self, test_db):
"""Test that different normalized names have independent sequences."""
conn = sqlite3.connect(test_db)
# Insert contacts with different normalized names
conn.execute("""
INSERT INTO contact_sessions
(session_id, user_id, contact_name, normalized_name, sequence_number)
VALUES (?, ?, ?, ?, ?)
""", ("session1", "testuser", "John Doe", "johndoe", 1))
conn.execute("""
INSERT INTO contact_sessions
(session_id, user_id, contact_name, normalized_name, sequence_number)
VALUES (?, ?, ?, ?, ?)
""", ("session2", "testuser", "Jane Smith", "janesmith", 1))
conn.commit()
# Both should get sequence 2 for their respective normalized names
seq_john = get_next_sequence_number(conn, "testuser", "johndoe")
seq_jane = get_next_sequence_number(conn, "testuser", "janesmith")
conn.close()
assert seq_john == 2
assert seq_jane == 2
def test_gaps_in_sequence(self, test_db):
"""Test that gaps in sequence numbers are handled (uses MAX)."""
conn = sqlite3.connect(test_db)
# Insert contacts with gaps in sequence
contacts = [
("session1", "testuser", "John", "john", 1),
("session2", "testuser", "John", "john", 3), # Skip 2
("session3", "testuser", "John", "john", 7), # Big gap
]
for session_id, user_id, name, normalized, seq in contacts:
conn.execute("""
INSERT INTO contact_sessions
(session_id, user_id, contact_name, normalized_name, sequence_number)
VALUES (?, ?, ?, ?, ?)
""", (session_id, user_id, name, normalized, seq))
conn.commit()
# Should use MAX + 1, not count
seq = get_next_sequence_number(conn, "testuser", "john")
conn.close()
assert seq == 8
def test_collision_prevention(self, test_db):
"""Test rapid sequential calls (simulates concurrent scenarios)."""
conn = sqlite3.connect(test_db)
sequences = []
for i in range(5):
seq = get_next_sequence_number(conn, "testuser", "collision")
# Insert with this sequence
conn.execute("""
INSERT INTO contact_sessions
(session_id, user_id, contact_name, normalized_name, sequence_number)
VALUES (?, ?, ?, ?, ?)
""", (f"session{i}", "testuser", "Collision", "collision", seq))
conn.commit()
sequences.append(seq)
conn.close()
# All sequences should be unique and sequential
assert sequences == [1, 2, 3, 4, 5]
assert len(set(sequences)) == 5 # All unique
class TestProducerIdFormat:
"""Test producer ID format expectations."""
def test_producer_id_format(self):
"""Test expected producer_id format: user_normalized_sequence."""
# These are the expected formats from the spec
assert "testuser_janedoe_1" == "testuser_janedoe_1"
assert "testuser_obrien_2" == "testuser_obrien_2"
assert "user1_christiankniep_1" == "user1_christiankniep_1"
def test_collision_examples(self):
"""Document collision handling examples from spec."""
# Per spec: O'Brien, OBrien, O Brien all normalize to "obrien"
collisions = ["obrien", "obrien", "obrien"]
assert len(set(collisions)) == 1 # All same normalized name
# Producer IDs should be: testuser_obrien_1, testuser_obrien_2, testuser_obrien_3
producer_ids = [
"testuser_obrien_1",
"testuser_obrien_2",
"testuser_obrien_3"
]
assert len(set(producer_ids)) == 3 # All unique