Spaces:
Sleeping
Sleeping
| """ | |
| Feedback Schema for RAG Chatbot | |
| This module defines dataclasses for feedback data structures | |
| and provides Snowflake schema generation. | |
| """ | |
| import os | |
| from datetime import datetime | |
| from dataclasses import dataclass, asdict, field | |
| from typing import List, Optional, Dict, Any, Union | |
| class RetrievedDocument: | |
| """Single retrieved document metadata""" | |
| doc_id: str | |
| filename: str | |
| page: int | |
| score: float | |
| content: str | |
| metadata: Dict[str, Any] | |
| class RetrievalEntry: | |
| """Single retrieval operation metadata""" | |
| rag_query: str | |
| documents_retrieved: List[RetrievedDocument] | |
| conversation_length: int | |
| filters_applied: Optional[Dict[str, Any]] = None | |
| timestamp: Optional[float] = None | |
| _raw_data: Optional[Dict[str, Any]] = None | |
| class UserFeedback: | |
| """User feedback submission data""" | |
| feedback_id: str | |
| open_ended_feedback: Optional[str] | |
| score: int | |
| is_feedback_about_last_retrieval: bool | |
| conversation_id: str | |
| timestamp: float | |
| message_count: int | |
| has_retrievals: bool | |
| retrieval_count: int | |
| transcript: List[Dict[str, str]] # List of {"role": "user"/"assistant", "content": "..."} | |
| retrievals: List[Dict[str, Any]] # List of retrieval objects with retrieved_docs and user_message_trigger | |
| feedback_score_related_retrieval_docs: Optional[Dict[str, Any]] = None # Conversation subset + retrieved docs | |
| retrieved_data: Optional[List[Dict[str, Any]]] = None # Preserved old column for backward compatibility | |
| created_at: str = field(default_factory=lambda: datetime.now().isoformat()) | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary with nested data structures""" | |
| result = asdict(self) | |
| return result | |
| def to_snowflake_schema(self) -> Dict[str, Any]: | |
| """Generate Snowflake schema for this dataclass""" | |
| schema = { | |
| "feedback_id": "VARCHAR(255)", | |
| "open_ended_feedback": "VARCHAR(16777216)", # Large text | |
| "score": "INTEGER", | |
| "is_feedback_about_last_retrieval": "BOOLEAN", | |
| "conversation_id": "VARCHAR(255)", | |
| "timestamp": "NUMBER(20, 0)", | |
| "message_count": "INTEGER", | |
| "has_retrievals": "BOOLEAN", | |
| "retrieval_count": "INTEGER", | |
| "transcript": "VARCHAR(16777216)", # JSON string of ARRAY of {"role": "user"/"assistant", "content": "..."} | |
| "retrievals": "VARCHAR(16777216)", # JSON string of ARRAY of retrieval objects | |
| "feedback_score_related_retrieval_docs": "VARCHAR(16777216)", # JSON string of OBJECT with conversation subset + retrieved docs | |
| "retrieved_data": "VARCHAR(16777216)", # JSON string - preserved old column for backward compatibility | |
| "created_at": "TIMESTAMP_NTZ", | |
| # transcript structure: [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}, ...] | |
| # retrievals structure: [ | |
| # { | |
| # "retrieved_docs": [{"content": "...", "metadata": {...}, ...}], # content truncated to 100 chars | |
| # "user_message_trigger": "final user message that triggered this retrieval" | |
| # }, | |
| # ... | |
| # ] | |
| # feedback_score_related_retrieval_docs structure: { | |
| # "conversation_up_to_point": [{"role": "user", "content": "..."}, ...], # subset of transcript | |
| # "retrieved_docs": [{"content": "...", "metadata": {...}, ...}] # full chunks with all info | |
| # } | |
| } | |
| return schema | |
| def get_snowflake_create_table_sql(cls, table_name: str = "USER_FEEDBACK_V3") -> str: | |
| """Generate CREATE TABLE SQL for Snowflake""" | |
| schema = cls.to_snowflake_schema(None) | |
| columns = [] | |
| for col_name, col_type in schema.items(): | |
| nullable = "NULL" if col_name not in ["feedback_id", "score", "timestamp"] else "NOT NULL" | |
| columns.append(f" {col_name} {col_type} {nullable}") | |
| # Build SQL string properly | |
| columns_str = ",\n".join(columns) | |
| sql = f"""CREATE TABLE IF NOT EXISTS {table_name} ( | |
| {columns_str}, | |
| PRIMARY KEY (feedback_id) | |
| ) | |
| CLUSTER BY (timestamp, conversation_id, score); | |
| -- Note: Snowflake doesn't support traditional indexes on regular tables. | |
| -- Instead, we use CLUSTER BY to optimize queries on these columns. | |
| -- Snowflake automatically maintains clustering for efficient querying. | |
| -- Note: transcript, retrievals, and feedback_score_related_retrieval_docs are stored as VARCHAR (JSON strings), | |
| -- same approach as the old retrieved_data column. This allows easy storage and retrieval without VARIANT type complexity. | |
| """ | |
| return sql | |
| # Snowflake variant schema for retrieved_data array | |
| RETRIEVAL_ENTRY_SCHEMA = { | |
| "rag_query": "VARCHAR", | |
| "documents_retrieved": "ARRAY", # Array of document objects | |
| "conversation_length": "INTEGER", | |
| "filters_applied": "OBJECT", | |
| "timestamp": "NUMBER" | |
| } | |
| DOCUMENT_SCHEMA = { | |
| "doc_id": "VARCHAR", | |
| "filename": "VARCHAR", | |
| "page": "INTEGER", | |
| "score": "DOUBLE", | |
| "content": "VARCHAR(16777216)", | |
| "metadata": "OBJECT" | |
| } | |
| def generate_snowflake_schema_sql(table_name: Optional[str] = None) -> str: | |
| """Generate complete Snowflake schema SQL for feedback system""" | |
| if table_name is None: | |
| table_name = os.getenv("SNOWFLAKE_FEEDBACK_TABLE", "USER_FEEDBACK_V3") | |
| return UserFeedback.get_snowflake_create_table_sql(table_name) | |
| def create_feedback_from_dict(data: Dict[str, Any]) -> UserFeedback: | |
| """Create UserFeedback instance from dictionary""" | |
| return UserFeedback( | |
| feedback_id=data.get("feedback_id", f"feedback_{data.get('timestamp', 'unknown')}"), | |
| open_ended_feedback=data.get("open_ended_feedback"), | |
| score=data["score"], | |
| is_feedback_about_last_retrieval=data["is_feedback_about_last_retrieval"], | |
| conversation_id=data["conversation_id"], | |
| timestamp=data["timestamp"], | |
| message_count=data["message_count"], | |
| has_retrievals=data["has_retrievals"], | |
| retrieval_count=data["retrieval_count"], | |
| transcript=data.get("transcript", []), | |
| retrievals=data.get("retrievals", []), | |
| feedback_score_related_retrieval_docs=data.get("feedback_score_related_retrieval_docs"), | |
| retrieved_data=data.get("retrieved_data") | |
| ) | |