""" Feedback Schema for RAG Chatbot This module defines dataclasses for feedback data structures and provides Snowflake schema generation. """ import os from datetime import datetime from dataclasses import dataclass, asdict, field from typing import List, Optional, Dict, Any, Union @dataclass class RetrievedDocument: """Single retrieved document metadata""" doc_id: str filename: str page: int score: float content: str metadata: Dict[str, Any] @dataclass class RetrievalEntry: """Single retrieval operation metadata""" rag_query: str documents_retrieved: List[RetrievedDocument] conversation_length: int filters_applied: Optional[Dict[str, Any]] = None timestamp: Optional[float] = None _raw_data: Optional[Dict[str, Any]] = None @dataclass class UserFeedback: """User feedback submission data""" feedback_id: str open_ended_feedback: Optional[str] score: int is_feedback_about_last_retrieval: bool conversation_id: str timestamp: float message_count: int has_retrievals: bool retrieval_count: int transcript: List[Dict[str, str]] # List of {"role": "user"/"assistant", "content": "..."} retrievals: List[Dict[str, Any]] # List of retrieval objects with retrieved_docs and user_message_trigger feedback_score_related_retrieval_docs: Optional[Dict[str, Any]] = None # Conversation subset + retrieved docs retrieved_data: Optional[List[Dict[str, Any]]] = None # Preserved old column for backward compatibility created_at: str = field(default_factory=lambda: datetime.now().isoformat()) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary with nested data structures""" result = asdict(self) return result def to_snowflake_schema(self) -> Dict[str, Any]: """Generate Snowflake schema for this dataclass""" schema = { "feedback_id": "VARCHAR(255)", "open_ended_feedback": "VARCHAR(16777216)", # Large text "score": "INTEGER", "is_feedback_about_last_retrieval": "BOOLEAN", "conversation_id": "VARCHAR(255)", "timestamp": "NUMBER(20, 0)", "message_count": "INTEGER", "has_retrievals": "BOOLEAN", "retrieval_count": "INTEGER", "transcript": "VARCHAR(16777216)", # JSON string of ARRAY of {"role": "user"/"assistant", "content": "..."} "retrievals": "VARCHAR(16777216)", # JSON string of ARRAY of retrieval objects "feedback_score_related_retrieval_docs": "VARCHAR(16777216)", # JSON string of OBJECT with conversation subset + retrieved docs "retrieved_data": "VARCHAR(16777216)", # JSON string - preserved old column for backward compatibility "created_at": "TIMESTAMP_NTZ", # transcript structure: [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}, ...] # retrievals structure: [ # { # "retrieved_docs": [{"content": "...", "metadata": {...}, ...}], # content truncated to 100 chars # "user_message_trigger": "final user message that triggered this retrieval" # }, # ... # ] # feedback_score_related_retrieval_docs structure: { # "conversation_up_to_point": [{"role": "user", "content": "..."}, ...], # subset of transcript # "retrieved_docs": [{"content": "...", "metadata": {...}, ...}] # full chunks with all info # } } return schema @classmethod def get_snowflake_create_table_sql(cls, table_name: str = "USER_FEEDBACK_V3") -> str: """Generate CREATE TABLE SQL for Snowflake""" schema = cls.to_snowflake_schema(None) columns = [] for col_name, col_type in schema.items(): nullable = "NULL" if col_name not in ["feedback_id", "score", "timestamp"] else "NOT NULL" columns.append(f" {col_name} {col_type} {nullable}") # Build SQL string properly columns_str = ",\n".join(columns) sql = f"""CREATE TABLE IF NOT EXISTS {table_name} ( {columns_str}, PRIMARY KEY (feedback_id) ) CLUSTER BY (timestamp, conversation_id, score); -- Note: Snowflake doesn't support traditional indexes on regular tables. -- Instead, we use CLUSTER BY to optimize queries on these columns. -- Snowflake automatically maintains clustering for efficient querying. -- Note: transcript, retrievals, and feedback_score_related_retrieval_docs are stored as VARCHAR (JSON strings), -- same approach as the old retrieved_data column. This allows easy storage and retrieval without VARIANT type complexity. """ return sql # Snowflake variant schema for retrieved_data array RETRIEVAL_ENTRY_SCHEMA = { "rag_query": "VARCHAR", "documents_retrieved": "ARRAY", # Array of document objects "conversation_length": "INTEGER", "filters_applied": "OBJECT", "timestamp": "NUMBER" } DOCUMENT_SCHEMA = { "doc_id": "VARCHAR", "filename": "VARCHAR", "page": "INTEGER", "score": "DOUBLE", "content": "VARCHAR(16777216)", "metadata": "OBJECT" } def generate_snowflake_schema_sql(table_name: Optional[str] = None) -> str: """Generate complete Snowflake schema SQL for feedback system""" if table_name is None: table_name = os.getenv("SNOWFLAKE_FEEDBACK_TABLE", "USER_FEEDBACK_V3") return UserFeedback.get_snowflake_create_table_sql(table_name) def create_feedback_from_dict(data: Dict[str, Any]) -> UserFeedback: """Create UserFeedback instance from dictionary""" return UserFeedback( feedback_id=data.get("feedback_id", f"feedback_{data.get('timestamp', 'unknown')}"), open_ended_feedback=data.get("open_ended_feedback"), score=data["score"], is_feedback_about_last_retrieval=data["is_feedback_about_last_retrieval"], conversation_id=data["conversation_id"], timestamp=data["timestamp"], message_count=data["message_count"], has_retrievals=data["has_retrievals"], retrieval_count=data["retrieval_count"], transcript=data.get("transcript", []), retrievals=data.get("retrievals", []), feedback_score_related_retrieval_docs=data.get("feedback_score_related_retrieval_docs"), retrieved_data=data.get("retrieved_data") )