akryldigital commited on
Commit
59ac60b
·
verified ·
1 Parent(s): 04f2072

add extra columns for feedback functionality

Browse files
Files changed (1) hide show
  1. src/reporting/feedback_schema.py +36 -71
src/reporting/feedback_schema.py CHANGED
@@ -4,10 +4,12 @@ Feedback Schema for RAG Chatbot
4
  This module defines dataclasses for feedback data structures
5
  and provides Snowflake schema generation.
6
  """
7
-
 
8
  from dataclasses import dataclass, asdict, field
9
  from typing import List, Optional, Dict, Any, Union
10
- from datetime import datetime
 
11
 
12
 
13
  @dataclass
@@ -39,34 +41,20 @@ class UserFeedback:
39
  open_ended_feedback: Optional[str]
40
  score: int
41
  is_feedback_about_last_retrieval: bool
42
- retrieved_data: List[RetrievalEntry]
43
  conversation_id: str
44
  timestamp: float
45
  message_count: int
46
  has_retrievals: bool
47
  retrieval_count: int
48
- user_query: Optional[str] = None
49
- bot_response: Optional[str] = None
 
 
50
  created_at: str = field(default_factory=lambda: datetime.now().isoformat())
51
 
52
  def to_dict(self) -> Dict[str, Any]:
53
  """Convert to dictionary with nested data structures"""
54
  result = asdict(self)
55
- # Handle nested objects
56
- if self.retrieved_data:
57
- result['retrieved_data'] = [self._serialize_retrieval_entry(entry) for entry in self.retrieved_data]
58
- return result
59
-
60
- def _serialize_retrieval_entry(self, entry: RetrievalEntry) -> Dict[str, Any]:
61
- """Serialize retrieval entry to dict"""
62
- # If raw data exists, use it (it's already properly formatted)
63
- if hasattr(entry, '_raw_data') and entry._raw_data:
64
- return entry._raw_data
65
-
66
- # Otherwise, serialize the dataclass
67
- result = asdict(entry)
68
- if entry.documents_retrieved:
69
- result['documents_retrieved'] = [asdict(doc) for doc in entry.documents_retrieved]
70
  return result
71
 
72
  def to_snowflake_schema(self) -> Dict[str, Any]:
@@ -81,28 +69,28 @@ class UserFeedback:
81
  "message_count": "INTEGER",
82
  "has_retrievals": "BOOLEAN",
83
  "retrieval_count": "INTEGER",
84
- "user_query": "VARCHAR(16777216)",
85
- "bot_response": "VARCHAR(16777216)",
 
 
86
  "created_at": "TIMESTAMP_NTZ",
87
- "retrieved_data": "VARIANT", # Array of retrieval entries
88
- # retrieved_data structure:
89
- # [
90
  # {
91
- # "rag_query": "...",
92
- # "conversation_length": 5,
93
- # "timestamp": 1234567890,
94
- # "docs_retrieved": [
95
- # {"filename": "...", "page": 14, "score": 0.95, ...},
96
- # ...
97
- # ]
98
  # },
99
  # ...
100
  # ]
 
 
 
 
101
  }
102
  return schema
103
 
104
  @classmethod
105
- def get_snowflake_create_table_sql(cls, table_name: str = "user_feedback") -> str:
106
  """Generate CREATE TABLE SQL for Snowflake"""
107
  schema = cls.to_snowflake_schema(None)
108
 
@@ -117,16 +105,13 @@ class UserFeedback:
117
  sql = f"""CREATE TABLE IF NOT EXISTS {table_name} (
118
  {columns_str},
119
  PRIMARY KEY (feedback_id)
120
- );
121
-
122
- -- Create index on timestamp for querying by time
123
- CREATE INDEX IF NOT EXISTS idx_feedback_timestamp ON {table_name} (timestamp);
124
-
125
- -- Create index on conversation_id for querying by conversation
126
- CREATE INDEX IF NOT EXISTS idx_feedback_conversation ON {table_name} (conversation_id);
127
-
128
- -- Create index on score for feedback analysis
129
- CREATE INDEX IF NOT EXISTS idx_feedback_score ON {table_name} (score);
130
  """
131
  return sql
132
 
@@ -150,47 +135,27 @@ DOCUMENT_SCHEMA = {
150
  }
151
 
152
 
153
- def generate_snowflake_schema_sql() -> str:
154
  """Generate complete Snowflake schema SQL for feedback system"""
155
- return UserFeedback.get_snowflake_create_table_sql("user_feedback")
 
 
156
 
157
 
158
  def create_feedback_from_dict(data: Dict[str, Any]) -> UserFeedback:
159
  """Create UserFeedback instance from dictionary"""
160
- # Parse retrieved_data if present
161
- retrieved_data = []
162
- if "retrieved_data" in data and data["retrieved_data"]:
163
- for entry_dict in data.get("retrieved_data", []):
164
- # Map the actual structure from rag_retrieval_history
165
- # Entry has: conversation_up_to, rag_query_expansion, docs_retrieved
166
- try:
167
- # Try to map to expected structure
168
- entry = RetrievalEntry(
169
- rag_query=entry_dict.get("rag_query_expansion", ""),
170
- documents_retrieved=[], # Empty for now, will store as raw data
171
- conversation_length=len(entry_dict.get("conversation_up_to", [])),
172
- filters_applied=None,
173
- timestamp=entry_dict.get("timestamp", None)
174
- )
175
- # Store raw data in the entry
176
- entry._raw_data = entry_dict # Store original for preservation
177
- retrieved_data.append(entry)
178
- except Exception as e:
179
- # If mapping fails, store as-is without strict typing
180
- pass
181
-
182
  return UserFeedback(
183
  feedback_id=data.get("feedback_id", f"feedback_{data.get('timestamp', 'unknown')}"),
184
  open_ended_feedback=data.get("open_ended_feedback"),
185
  score=data["score"],
186
  is_feedback_about_last_retrieval=data["is_feedback_about_last_retrieval"],
187
- retrieved_data=retrieved_data,
188
  conversation_id=data["conversation_id"],
189
  timestamp=data["timestamp"],
190
  message_count=data["message_count"],
191
  has_retrievals=data["has_retrievals"],
192
  retrieval_count=data["retrieval_count"],
193
- user_query=data.get("user_query"),
194
- bot_response=data.get("bot_response")
 
 
195
  )
196
-
 
4
  This module defines dataclasses for feedback data structures
5
  and provides Snowflake schema generation.
6
  """
7
+ import os
8
+ from datetime import datetime
9
  from dataclasses import dataclass, asdict, field
10
  from typing import List, Optional, Dict, Any, Union
11
+
12
+
13
 
14
 
15
  @dataclass
 
41
  open_ended_feedback: Optional[str]
42
  score: int
43
  is_feedback_about_last_retrieval: bool
 
44
  conversation_id: str
45
  timestamp: float
46
  message_count: int
47
  has_retrievals: bool
48
  retrieval_count: int
49
+ transcript: List[Dict[str, str]] # List of {"role": "user"/"assistant", "content": "..."}
50
+ retrievals: List[Dict[str, Any]] # List of retrieval objects with retrieved_docs and user_message_trigger
51
+ feedback_score_related_retrieval_docs: Optional[Dict[str, Any]] = None # Conversation subset + retrieved docs
52
+ retrieved_data: Optional[List[Dict[str, Any]]] = None # Preserved old column for backward compatibility
53
  created_at: str = field(default_factory=lambda: datetime.now().isoformat())
54
 
55
  def to_dict(self) -> Dict[str, Any]:
56
  """Convert to dictionary with nested data structures"""
57
  result = asdict(self)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58
  return result
59
 
60
  def to_snowflake_schema(self) -> Dict[str, Any]:
 
69
  "message_count": "INTEGER",
70
  "has_retrievals": "BOOLEAN",
71
  "retrieval_count": "INTEGER",
72
+ "transcript": "VARCHAR(16777216)", # JSON string of ARRAY of {"role": "user"/"assistant", "content": "..."}
73
+ "retrievals": "VARCHAR(16777216)", # JSON string of ARRAY of retrieval objects
74
+ "feedback_score_related_retrieval_docs": "VARCHAR(16777216)", # JSON string of OBJECT with conversation subset + retrieved docs
75
+ "retrieved_data": "VARCHAR(16777216)", # JSON string - preserved old column for backward compatibility
76
  "created_at": "TIMESTAMP_NTZ",
77
+ # transcript structure: [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}, ...]
78
+ # retrievals structure: [
 
79
  # {
80
+ # "retrieved_docs": [{"content": "...", "metadata": {...}, ...}], # content truncated to 100 chars
81
+ # "user_message_trigger": "final user message that triggered this retrieval"
 
 
 
 
 
82
  # },
83
  # ...
84
  # ]
85
+ # feedback_score_related_retrieval_docs structure: {
86
+ # "conversation_up_to_point": [{"role": "user", "content": "..."}, ...], # subset of transcript
87
+ # "retrieved_docs": [{"content": "...", "metadata": {...}, ...}] # full chunks with all info
88
+ # }
89
  }
90
  return schema
91
 
92
  @classmethod
93
+ def get_snowflake_create_table_sql(cls, table_name: str = "USER_FEEDBACK_V3") -> str:
94
  """Generate CREATE TABLE SQL for Snowflake"""
95
  schema = cls.to_snowflake_schema(None)
96
 
 
105
  sql = f"""CREATE TABLE IF NOT EXISTS {table_name} (
106
  {columns_str},
107
  PRIMARY KEY (feedback_id)
108
+ )
109
+ CLUSTER BY (timestamp, conversation_id, score);
110
+ -- Note: Snowflake doesn't support traditional indexes on regular tables.
111
+ -- Instead, we use CLUSTER BY to optimize queries on these columns.
112
+ -- Snowflake automatically maintains clustering for efficient querying.
113
+ -- Note: transcript, retrievals, and feedback_score_related_retrieval_docs are stored as VARCHAR (JSON strings),
114
+ -- same approach as the old retrieved_data column. This allows easy storage and retrieval without VARIANT type complexity.
 
 
 
115
  """
116
  return sql
117
 
 
135
  }
136
 
137
 
138
+ def generate_snowflake_schema_sql(table_name: Optional[str] = None) -> str:
139
  """Generate complete Snowflake schema SQL for feedback system"""
140
+ if table_name is None:
141
+ table_name = os.getenv("SNOWFLAKE_FEEDBACK_TABLE", "USER_FEEDBACK_V3")
142
+ return UserFeedback.get_snowflake_create_table_sql(table_name)
143
 
144
 
145
  def create_feedback_from_dict(data: Dict[str, Any]) -> UserFeedback:
146
  """Create UserFeedback instance from dictionary"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
147
  return UserFeedback(
148
  feedback_id=data.get("feedback_id", f"feedback_{data.get('timestamp', 'unknown')}"),
149
  open_ended_feedback=data.get("open_ended_feedback"),
150
  score=data["score"],
151
  is_feedback_about_last_retrieval=data["is_feedback_about_last_retrieval"],
 
152
  conversation_id=data["conversation_id"],
153
  timestamp=data["timestamp"],
154
  message_count=data["message_count"],
155
  has_retrievals=data["has_retrievals"],
156
  retrieval_count=data["retrieval_count"],
157
+ transcript=data.get("transcript", []),
158
+ retrievals=data.get("retrievals", []),
159
+ feedback_score_related_retrieval_docs=data.get("feedback_score_related_retrieval_docs"),
160
+ retrieved_data=data.get("retrieved_data")
161
  )