Spaces:
Sleeping
Sleeping
| """ | |
| SQLite storage service for user profiles and contact sessions. | |
| Feature: 012-profile-contact-ui | |
| Feature: 001-refine-memory-producer-logic (producer_id generation) | |
| Feature: 015-sqlite-s3-backup (S3 backup triggers) | |
| S3 Backup Trigger Policy: | |
| - Backup IS triggered for: | |
| - New user profile creation (create_or_update_user) | |
| - New contact session creation (create_contact_session_with_id) | |
| - Contact metadata changes (update_contact_session: name/description) | |
| - Backup is NOT triggered for: | |
| - User login/last_login updates (create_or_update_user on existing user) | |
| - last_interaction timestamp updates (update_contact_last_interaction) | |
| - Message count increments or other ephemeral data | |
| This policy optimizes S3 costs by only backing up when critical data changes. | |
| """ | |
| import os | |
| import sqlite3 | |
| from datetime import datetime | |
| from typing import List, Optional | |
| from ..models import ContactSession, UserProfile | |
| from .session_service import generate_contact_session_id, generate_profile_session_id | |
| from ..utils.contact_utils import normalize_contact_name | |
| class NotFoundError(Exception): | |
| """Raised when a database entity is not found.""" | |
| pass | |
| # Global S3 backup manager (initialized on first use) | |
| _backup_manager = None | |
| def _get_backup_manager(): | |
| """ | |
| Lazy initialization of BackupManager. | |
| Returns: | |
| BackupManager instance or None if S3 is disabled | |
| """ | |
| global _backup_manager | |
| if _backup_manager is None: | |
| try: | |
| from .s3_config import S3Config | |
| from .s3_backup import BackupManager | |
| config = S3Config.from_env() | |
| db_path = os.getenv("DATABASE_PATH", "data/contacts.db") | |
| _backup_manager = BackupManager(config, db_path) | |
| except Exception as e: | |
| # Log but don't crash if backup initialization fails | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| logger.warning(f"Failed to initialize S3 backup: {e}") | |
| _backup_manager = None | |
| return _backup_manager | |
| def get_db_connection() -> sqlite3.Connection: | |
| """Get SQLite database connection.""" | |
| db_path = os.getenv("DATABASE_PATH", "data/contacts.db") | |
| conn = sqlite3.connect(db_path) | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| def init_db(): | |
| """Initialize database schema.""" | |
| db_path = os.getenv("DATABASE_PATH", "data/contacts.db") | |
| os.makedirs(os.path.dirname(db_path), exist_ok=True) | |
| conn = sqlite3.connect(db_path) | |
| with open("migrations/001_create_tables.sql", "r") as f: | |
| conn.executescript(f.read()) | |
| conn.commit() | |
| conn.close() | |
| def get_next_sequence_number(conn: sqlite3.Connection, user_id: str, normalized_name: str) -> int: | |
| """ | |
| Get the next sequence number for a contact with a given normalized name. | |
| Uses atomic SQLite query to prevent collisions when multiple contacts | |
| with the same normalized name are created. | |
| Feature: 001-refine-memory-producer-logic | |
| Args: | |
| conn: Active database connection | |
| user_id: OAuth username | |
| normalized_name: Normalized contact name (e.g., "johnsmith") | |
| Returns: | |
| Next sequence number (1 for first contact with this name, 2 for second, etc.) | |
| """ | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| """ | |
| SELECT COALESCE(MAX(sequence_number), 0) + 1 as next_seq | |
| FROM contact_sessions | |
| WHERE user_id = ? AND normalized_name = ? | |
| """, | |
| (user_id, normalized_name) | |
| ) | |
| return cursor.fetchone()["next_seq"] | |
| def create_or_update_user( | |
| user_id: str, | |
| display_name: str, | |
| profile_picture_url: Optional[str] = None, | |
| session_id: Optional[str] = None | |
| ) -> UserProfile: | |
| """ | |
| Create a new user profile or update last_login for existing user. | |
| Args: | |
| user_id: HuggingFace username from OAuth | |
| display_name: User's display name | |
| profile_picture_url: URL to avatar image | |
| session_id: Backend-generated session ID (for new users only) | |
| Returns: | |
| UserProfile object | |
| Raises: | |
| ValueError: If user_id is empty or invalid format | |
| sqlite3.IntegrityError: If session_id generation fails | |
| """ | |
| if not user_id: | |
| raise ValueError("user_id cannot be empty") | |
| now = datetime.now() | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| # Check if user exists | |
| cursor.execute("SELECT * FROM user_profiles WHERE user_id = ?", (user_id,)) | |
| existing = cursor.fetchone() | |
| if existing: | |
| # Update last_login and display_name (don't change session_id) | |
| cursor.execute( | |
| """ | |
| UPDATE user_profiles | |
| SET last_login = ?, display_name = ? | |
| WHERE user_id = ? | |
| """, | |
| (now, display_name, user_id), | |
| ) | |
| conn.commit() | |
| # Fetch updated profile | |
| cursor.execute("SELECT * FROM user_profiles WHERE user_id = ?", (user_id,)) | |
| row = cursor.fetchone() | |
| else: | |
| # Create new user - use provided session_id or generate one | |
| if not session_id: | |
| session_id = generate_profile_session_id(user_id) | |
| cursor.execute( | |
| """ | |
| INSERT INTO user_profiles | |
| (user_id, display_name, profile_picture_url, created_at, last_login, session_id) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| """, | |
| (user_id, display_name, profile_picture_url, now, now, session_id), | |
| ) | |
| conn.commit() | |
| # Fetch created profile | |
| cursor.execute("SELECT * FROM user_profiles WHERE user_id = ?", (user_id,)) | |
| row = cursor.fetchone() | |
| # Trigger S3 backup for new user creation (Feature 015) | |
| backup_manager = _get_backup_manager() | |
| if backup_manager: | |
| backup_manager.request_backup() | |
| conn.close() | |
| return UserProfile( | |
| user_id=row["user_id"], | |
| display_name=row["display_name"], | |
| profile_picture_url=row["profile_picture_url"], | |
| created_at=datetime.fromisoformat(row["created_at"]), | |
| last_login=datetime.fromisoformat(row["last_login"]), | |
| session_id=row["session_id"], | |
| ) | |
| def get_user_profile(user_id: str) -> Optional[UserProfile]: | |
| """ | |
| Retrieve user profile by user_id. | |
| Args: | |
| user_id: HuggingFace username | |
| Returns: | |
| UserProfile object or None if not found | |
| """ | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT * FROM user_profiles WHERE user_id = ?", (user_id,)) | |
| row = cursor.fetchone() | |
| conn.close() | |
| if not row: | |
| return None | |
| return UserProfile( | |
| user_id=row["user_id"], | |
| display_name=row["display_name"], | |
| profile_picture_url=row["profile_picture_url"], | |
| created_at=datetime.fromisoformat(row["created_at"]), | |
| last_login=datetime.fromisoformat(row["last_login"]), | |
| session_id=row["session_id"], | |
| ) | |
| def create_contact_session( | |
| user_id: str, | |
| contact_name: str, | |
| contact_description: Optional[str] = None, | |
| is_reference: bool = False, | |
| ) -> ContactSession: | |
| """ | |
| Create a new contact session with UUID v4-based session ID. | |
| Args: | |
| user_id: Owner's HuggingFace username | |
| contact_name: Display name for contact (1-255 chars) | |
| contact_description: Optional description (≤500 chars) | |
| is_reference: Whether this is a reference session (default: False) | |
| Returns: | |
| ContactSession object with generated session_id | |
| Raises: | |
| ValueError: If validation fails or contact limit reached | |
| PermissionError: If is_reference=True but user lacks admin privileges | |
| sqlite3.IntegrityError: If UUID collision (retry up to 3 times) | |
| """ | |
| # Validate contact_name | |
| contact_name = contact_name.strip() | |
| if not contact_name or len(contact_name) > 255: | |
| raise ValueError("Contact name must be 1-255 characters") | |
| # Validate contact_description | |
| if contact_description and len(contact_description) > 500: | |
| raise ValueError("Description cannot exceed 500 characters") | |
| # Check contact count limit (500 per user) | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT COUNT(*) as count FROM contact_sessions WHERE user_id = ?", (user_id,)) | |
| count = cursor.fetchone()["count"] | |
| if count >= 500: | |
| conn.close() | |
| raise ValueError("Maximum of 500 contacts reached") | |
| # Generate producer identifier (Feature: 001-refine-memory-producer-logic) | |
| normalized_name = normalize_contact_name(contact_name) | |
| sequence_number = get_next_sequence_number(conn, user_id, normalized_name) | |
| producer_id = f"{user_id}_{normalized_name}_{sequence_number}" | |
| # Generate session ID with retry logic (up to 3 attempts) | |
| max_attempts = 3 | |
| for attempt in range(max_attempts): | |
| try: | |
| session_id = generate_contact_session_id(user_id) | |
| now = datetime.now() | |
| cursor.execute( | |
| """ | |
| INSERT INTO contact_sessions | |
| (session_id, user_id, contact_id, contact_name, contact_description, is_reference, | |
| created_at, last_interaction, normalized_name, sequence_number, producer_id) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, | |
| ( | |
| session_id, | |
| user_id, | |
| session_id, # contact_id defaults to session_id | |
| contact_name, | |
| contact_description, | |
| is_reference, | |
| now, | |
| now, | |
| normalized_name, | |
| sequence_number, | |
| producer_id, | |
| ), | |
| ) | |
| conn.commit() | |
| # Fetch created session | |
| cursor.execute("SELECT * FROM contact_sessions WHERE session_id = ?", (session_id,)) | |
| row = cursor.fetchone() | |
| conn.close() | |
| return ContactSession( | |
| session_id=row["session_id"], | |
| user_id=row["user_id"], | |
| contact_id=row.get("contact_id") or row["session_id"], | |
| contact_name=row["contact_name"], | |
| contact_description=row["contact_description"], | |
| is_reference=bool(row["is_reference"]), | |
| created_at=datetime.fromisoformat(row["created_at"]), | |
| last_interaction=datetime.fromisoformat(row["last_interaction"]), | |
| normalized_name=row["normalized_name"], | |
| sequence_number=row["sequence_number"], | |
| producer_id=row["producer_id"], | |
| ) | |
| except sqlite3.IntegrityError as e: | |
| if attempt < max_attempts - 1: | |
| # UUID collision, retry | |
| continue | |
| else: | |
| conn.close() | |
| raise e | |
| def create_contact_session_with_id( | |
| user_id: str, | |
| session_id: str, | |
| contact_name: str, | |
| contact_description: Optional[str] = None, | |
| is_reference: bool = False, | |
| ) -> ContactSession: | |
| """ | |
| Create a new contact session with a provided session_id (from backend API). | |
| Args: | |
| user_id: HuggingFace username | |
| session_id: Pre-generated session ID from backend API | |
| contact_name: Display name (1-255 chars, required) | |
| contact_description: Optional description (≤500 chars) | |
| is_reference: Whether this is a reference session (default: False) | |
| Returns: | |
| Created ContactSession object | |
| Raises: | |
| ValueError: If validation fails or contact limit exceeded | |
| sqlite3.IntegrityError: If session_id already exists | |
| """ | |
| # Validation | |
| if not user_id: | |
| raise ValueError("user_id cannot be empty") | |
| if not session_id: | |
| raise ValueError("session_id cannot be empty") | |
| if not contact_name or not contact_name.strip(): | |
| raise ValueError("Contact name is required") | |
| contact_name = contact_name.strip() | |
| if len(contact_name) > 255: | |
| raise ValueError("Contact name cannot exceed 255 characters") | |
| if contact_description and len(contact_description) > 500: | |
| raise ValueError("Description cannot exceed 500 characters") | |
| # Check contact count limit (500 per user) | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT COUNT(*) as count FROM contact_sessions WHERE user_id = ?", (user_id,)) | |
| count = cursor.fetchone()["count"] | |
| if count >= 500: | |
| conn.close() | |
| raise ValueError("Maximum of 500 contacts reached") | |
| # Generate producer identifier (Feature: 001-refine-memory-producer-logic) | |
| normalized_name = normalize_contact_name(contact_name) | |
| sequence_number = get_next_sequence_number(conn, user_id, normalized_name) | |
| producer_id = f"{user_id}_{normalized_name}_{sequence_number}" | |
| try: | |
| now = datetime.now() | |
| cursor.execute( | |
| """ | |
| INSERT INTO contact_sessions | |
| (session_id, user_id, contact_id, contact_name, contact_description, is_reference, | |
| created_at, last_interaction, normalized_name, sequence_number, producer_id) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, | |
| ( | |
| session_id, | |
| user_id, | |
| session_id, # contact_id defaults to session_id | |
| contact_name, | |
| contact_description, | |
| is_reference, | |
| now, | |
| now, | |
| normalized_name, | |
| sequence_number, | |
| producer_id, | |
| ), | |
| ) | |
| conn.commit() | |
| # Fetch created session | |
| cursor.execute("SELECT * FROM contact_sessions WHERE session_id = ?", (session_id,)) | |
| row = cursor.fetchone() | |
| conn.close() | |
| # Trigger S3 backup for new contact creation (Feature 015) | |
| backup_manager = _get_backup_manager() | |
| if backup_manager: | |
| backup_manager.request_backup() | |
| return ContactSession( | |
| session_id=row["session_id"], | |
| user_id=row["user_id"], | |
| contact_id=row["contact_id"] if row["contact_id"] else row["session_id"], | |
| contact_name=row["contact_name"], | |
| contact_description=row["contact_description"], | |
| is_reference=bool(row["is_reference"]), | |
| created_at=datetime.fromisoformat(row["created_at"]), | |
| last_interaction=datetime.fromisoformat(row["last_interaction"]), | |
| normalized_name=row["normalized_name"], | |
| sequence_number=row["sequence_number"], | |
| producer_id=row["producer_id"], | |
| ) | |
| except sqlite3.IntegrityError as e: | |
| conn.close() | |
| raise e | |
| def list_contact_sessions( | |
| user_id: str, sort_by: str = "last_interaction" | |
| ) -> List[ContactSession]: | |
| """ | |
| List all contact sessions for a user, sorted by most recent interaction. | |
| Args: | |
| user_id: HuggingFace username | |
| sort_by: Sort field (default: "last_interaction", descending) | |
| Returns: | |
| List of ContactSession objects | |
| """ | |
| from opentelemetry import trace | |
| from ..utils.tracing import is_tracing_enabled | |
| tracer = trace.get_tracer(__name__) | |
| # Only create span if tracing is enabled | |
| if is_tracing_enabled(): | |
| span = tracer.start_span("storage.list_contact_sessions") | |
| span.set_attribute("user_id", user_id) | |
| span.set_attribute("sort_by", sort_by) | |
| else: | |
| span = None | |
| try: | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| # Use index-optimized query | |
| cursor.execute( | |
| """ | |
| SELECT * FROM contact_sessions | |
| WHERE user_id = ? | |
| ORDER BY last_interaction DESC | |
| """, | |
| (user_id,), | |
| ) | |
| rows = cursor.fetchall() | |
| conn.close() | |
| # Helper function to safely access optional columns | |
| def safe_row_access(row, key): | |
| """Safely access optional column, returns None if not present.""" | |
| return row[key] if key in row.keys() else None | |
| contacts = [ | |
| ContactSession( | |
| session_id=row["session_id"], | |
| user_id=row["user_id"], | |
| contact_id=safe_row_access(row, "contact_id") or row["session_id"], | |
| contact_name=row["contact_name"], | |
| contact_description=row["contact_description"], | |
| is_reference=bool(row["is_reference"]), | |
| created_at=datetime.fromisoformat(row["created_at"]), | |
| last_interaction=datetime.fromisoformat(row["last_interaction"]), | |
| normalized_name=safe_row_access(row, "normalized_name"), | |
| sequence_number=safe_row_access(row, "sequence_number"), | |
| producer_id=safe_row_access(row, "producer_id"), | |
| ) | |
| for row in rows | |
| ] | |
| if span: | |
| span.set_attribute("result_count", len(contacts)) | |
| return contacts | |
| finally: | |
| if span: | |
| span.end() | |
| def get_contact_session(session_id: str) -> Optional[ContactSession]: | |
| """ | |
| Retrieve a specific contact session by session_id. | |
| Args: | |
| session_id: Contact session ID | |
| Returns: | |
| ContactSession object or None if not found | |
| """ | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT * FROM contact_sessions WHERE session_id = ?", (session_id,)) | |
| row = cursor.fetchone() | |
| conn.close() | |
| if not row: | |
| return None | |
| # Get column names to safely access optional fields | |
| columns = row.keys() | |
| return ContactSession( | |
| session_id=row["session_id"], | |
| user_id=row["user_id"], | |
| contact_id=row["contact_id"] if "contact_id" in columns else row["session_id"], | |
| contact_name=row["contact_name"], | |
| contact_description=row["contact_description"], | |
| is_reference=bool(row["is_reference"]), | |
| created_at=datetime.fromisoformat(row["created_at"]), | |
| last_interaction=datetime.fromisoformat(row["last_interaction"]), | |
| normalized_name=row["normalized_name"] if "normalized_name" in columns else None, | |
| sequence_number=row["sequence_number"] if "sequence_number" in columns else None, | |
| producer_id=row["producer_id"] if "producer_id" in columns else None, | |
| ) | |
| def update_contact_session( | |
| session_id: str, | |
| contact_name: Optional[str] = None, | |
| contact_description: Optional[str] = None, | |
| ) -> ContactSession: | |
| """ | |
| Update contact metadata (name/description only). | |
| Args: | |
| session_id: Contact session ID | |
| contact_name: New contact name (if provided) | |
| contact_description: New description (if provided) | |
| Returns: | |
| Updated ContactSession object | |
| Raises: | |
| ValueError: If validation fails | |
| NotFoundError: If session_id doesn't exist | |
| """ | |
| # Validate contact_name if provided | |
| if contact_name is not None: | |
| contact_name = contact_name.strip() | |
| if not contact_name or len(contact_name) > 255: | |
| raise ValueError("Contact name must be 1-255 characters") | |
| # Validate contact_description if provided | |
| if contact_description is not None and len(contact_description) > 500: | |
| raise ValueError("Description cannot exceed 500 characters") | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| # Check if session exists | |
| cursor.execute("SELECT * FROM contact_sessions WHERE session_id = ?", (session_id,)) | |
| existing = cursor.fetchone() | |
| if not existing: | |
| conn.close() | |
| raise NotFoundError(f"Contact session {session_id} not found") | |
| # Build update query dynamically | |
| updates = [] | |
| params = [] | |
| if contact_name is not None: | |
| updates.append("contact_name = ?") | |
| params.append(contact_name) | |
| if contact_description is not None: | |
| updates.append("contact_description = ?") | |
| params.append(contact_description) | |
| if updates: | |
| params.append(session_id) | |
| query = f"UPDATE contact_sessions SET {', '.join(updates)} WHERE session_id = ?" | |
| cursor.execute(query, params) | |
| conn.commit() | |
| # Trigger S3 backup for critical metadata changes (Feature 015) | |
| # Only when name or description changes (not for last_interaction updates) | |
| backup_manager = _get_backup_manager() | |
| if backup_manager: | |
| backup_manager.request_backup() | |
| # Fetch updated session | |
| cursor.execute("SELECT * FROM contact_sessions WHERE session_id = ?", (session_id,)) | |
| row = cursor.fetchone() | |
| conn.close() | |
| return ContactSession( | |
| session_id=row["session_id"], | |
| user_id=row["user_id"], | |
| contact_name=row["contact_name"], | |
| contact_description=row["contact_description"], | |
| is_reference=bool(row["is_reference"]), | |
| created_at=datetime.fromisoformat(row["created_at"]), | |
| last_interaction=datetime.fromisoformat(row["last_interaction"]), | |
| ) | |
| def update_contact_last_interaction(session_id: str) -> None: | |
| """ | |
| Update last_interaction timestamp for a contact session. | |
| Feature: 001-contact-session-fixes | |
| Called after adding facts or sending messages to keep the contact | |
| sorted by most recent activity in the contact list. | |
| Args: | |
| session_id: Contact session ID | |
| Raises: | |
| NotFoundError: If session_id doesn't exist | |
| """ | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| # Check if session exists | |
| cursor.execute("SELECT * FROM contact_sessions WHERE session_id = ?", (session_id,)) | |
| existing = cursor.fetchone() | |
| if not existing: | |
| conn.close() | |
| raise NotFoundError(f"Contact session {session_id} not found") | |
| # Update last_interaction to current time | |
| now = datetime.now() | |
| cursor.execute( | |
| "UPDATE contact_sessions SET last_interaction = ? WHERE session_id = ?", | |
| (now, session_id) | |
| ) | |
| conn.commit() | |
| conn.close() | |
| def delete_contact_session(session_id: str) -> None: | |
| """ | |
| Delete a contact session from SQLite. | |
| Note: This does NOT delete backend facts/messages. | |
| Args: | |
| session_id: Contact session ID | |
| Raises: | |
| NotFoundError: If session_id doesn't exist | |
| """ | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| # Check if session exists | |
| cursor.execute("SELECT * FROM contact_sessions WHERE session_id = ?", (session_id,)) | |
| existing = cursor.fetchone() | |
| if not existing: | |
| conn.close() | |
| raise NotFoundError(f"Contact session {session_id} not found") | |
| cursor.execute("DELETE FROM contact_sessions WHERE session_id = ?", (session_id,)) | |
| conn.commit() | |
| conn.close() | |
| def update_last_interaction(session_id: str) -> None: | |
| """ | |
| Update last_interaction timestamp to current time. | |
| Args: | |
| session_id: Contact session ID | |
| Raises: | |
| NotFoundError: If session_id doesn't exist | |
| """ | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| now = datetime.now() | |
| cursor.execute( | |
| "UPDATE contact_sessions SET last_interaction = ? WHERE session_id = ?", | |
| (now, session_id), | |
| ) | |
| if cursor.rowcount == 0: | |
| conn.close() | |
| raise NotFoundError(f"Contact session {session_id} not found") | |
| conn.commit() | |
| conn.close() | |