audit_assistant / src /feedback /feedback_schema.py
akryldigital's picture
Gemini FSA (#6)
8d898c4 verified
"""
Feedback Schema for RAG Chatbot
This module defines dataclasses for feedback data structures
and provides Snowflake schema generation.
"""
import os
from datetime import datetime
from dataclasses import dataclass, asdict, field
from typing import List, Optional, Dict, Any, Union
@dataclass
class RetrievedDocument:
"""Single retrieved document metadata"""
doc_id: str
filename: str
page: int
score: float
content: str
metadata: Dict[str, Any]
@dataclass
class RetrievalEntry:
"""Single retrieval operation metadata"""
rag_query: str
documents_retrieved: List[RetrievedDocument]
conversation_length: int
filters_applied: Optional[Dict[str, Any]] = None
timestamp: Optional[float] = None
_raw_data: Optional[Dict[str, Any]] = None
@dataclass
class UserFeedback:
"""User feedback submission data"""
feedback_id: str
open_ended_feedback: Optional[str]
score: int
is_feedback_about_last_retrieval: bool
conversation_id: str
timestamp: float
message_count: int
has_retrievals: bool
retrieval_count: int
transcript: List[Dict[str, str]] # List of {"role": "user"/"assistant", "content": "..."}
retrievals: List[Dict[str, Any]] # List of retrieval objects with retrieved_docs and user_message_trigger
feedback_score_related_retrieval_docs: Optional[Dict[str, Any]] = None # Conversation subset + retrieved docs
retrieved_data: Optional[List[Dict[str, Any]]] = None # Preserved old column for backward compatibility
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary with nested data structures"""
result = asdict(self)
return result
def to_snowflake_schema(self) -> Dict[str, Any]:
"""Generate Snowflake schema for this dataclass"""
schema = {
"feedback_id": "VARCHAR(255)",
"open_ended_feedback": "VARCHAR(16777216)", # Large text
"score": "INTEGER",
"is_feedback_about_last_retrieval": "BOOLEAN",
"conversation_id": "VARCHAR(255)",
"timestamp": "NUMBER(20, 0)",
"message_count": "INTEGER",
"has_retrievals": "BOOLEAN",
"retrieval_count": "INTEGER",
"transcript": "VARCHAR(16777216)", # JSON string of ARRAY of {"role": "user"/"assistant", "content": "..."}
"retrievals": "VARCHAR(16777216)", # JSON string of ARRAY of retrieval objects
"feedback_score_related_retrieval_docs": "VARCHAR(16777216)", # JSON string of OBJECT with conversation subset + retrieved docs
"retrieved_data": "VARCHAR(16777216)", # JSON string - preserved old column for backward compatibility
"created_at": "TIMESTAMP_NTZ",
# transcript structure: [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}, ...]
# retrievals structure: [
# {
# "retrieved_docs": [{"content": "...", "metadata": {...}, ...}], # content truncated to 100 chars
# "user_message_trigger": "final user message that triggered this retrieval"
# },
# ...
# ]
# feedback_score_related_retrieval_docs structure: {
# "conversation_up_to_point": [{"role": "user", "content": "..."}, ...], # subset of transcript
# "retrieved_docs": [{"content": "...", "metadata": {...}, ...}] # full chunks with all info
# }
}
return schema
@classmethod
def get_snowflake_create_table_sql(cls, table_name: str = "USER_FEEDBACK_V3") -> str:
"""Generate CREATE TABLE SQL for Snowflake"""
schema = cls.to_snowflake_schema(None)
columns = []
for col_name, col_type in schema.items():
nullable = "NULL" if col_name not in ["feedback_id", "score", "timestamp"] else "NOT NULL"
columns.append(f" {col_name} {col_type} {nullable}")
# Build SQL string properly
columns_str = ",\n".join(columns)
sql = f"""CREATE TABLE IF NOT EXISTS {table_name} (
{columns_str},
PRIMARY KEY (feedback_id)
)
CLUSTER BY (timestamp, conversation_id, score);
-- Note: Snowflake doesn't support traditional indexes on regular tables.
-- Instead, we use CLUSTER BY to optimize queries on these columns.
-- Snowflake automatically maintains clustering for efficient querying.
-- Note: transcript, retrievals, and feedback_score_related_retrieval_docs are stored as VARCHAR (JSON strings),
-- same approach as the old retrieved_data column. This allows easy storage and retrieval without VARIANT type complexity.
"""
return sql
# Snowflake variant schema for retrieved_data array
RETRIEVAL_ENTRY_SCHEMA = {
"rag_query": "VARCHAR",
"documents_retrieved": "ARRAY", # Array of document objects
"conversation_length": "INTEGER",
"filters_applied": "OBJECT",
"timestamp": "NUMBER"
}
DOCUMENT_SCHEMA = {
"doc_id": "VARCHAR",
"filename": "VARCHAR",
"page": "INTEGER",
"score": "DOUBLE",
"content": "VARCHAR(16777216)",
"metadata": "OBJECT"
}
def generate_snowflake_schema_sql(table_name: Optional[str] = None) -> str:
"""Generate complete Snowflake schema SQL for feedback system"""
if table_name is None:
table_name = os.getenv("SNOWFLAKE_FEEDBACK_TABLE", "USER_FEEDBACK_V3")
return UserFeedback.get_snowflake_create_table_sql(table_name)
def create_feedback_from_dict(data: Dict[str, Any]) -> UserFeedback:
"""Create UserFeedback instance from dictionary"""
return UserFeedback(
feedback_id=data.get("feedback_id", f"feedback_{data.get('timestamp', 'unknown')}"),
open_ended_feedback=data.get("open_ended_feedback"),
score=data["score"],
is_feedback_about_last_retrieval=data["is_feedback_about_last_retrieval"],
conversation_id=data["conversation_id"],
timestamp=data["timestamp"],
message_count=data["message_count"],
has_retrievals=data["has_retrievals"],
retrieval_count=data["retrieval_count"],
transcript=data.get("transcript", []),
retrievals=data.get("retrievals", []),
feedback_score_related_retrieval_docs=data.get("feedback_score_related_retrieval_docs"),
retrieved_data=data.get("retrieved_data")
)