audit_assistant / src /reporting /snowflake_connector.py
akryldigital's picture
add extra columns for feedback functionality
69de8d2 verified
"""
Snowflake Connector for Feedback System
This module handles inserting user feedback into Snowflake.
"""
import os
import json
import logging
from typing import Dict, Any, Optional
from src.reporting.feedback_schema import UserFeedback
# Try to import snowflake connector
try:
import snowflake.connector
SNOWFLAKE_AVAILABLE = True
except ImportError:
SNOWFLAKE_AVAILABLE = False
logging.warning("⚠️ snowflake-connector-python not installed. Install with: pip install snowflake-connector-python")
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class SnowflakeFeedbackConnector:
"""Connector for inserting feedback into Snowflake"""
def __init__(
self,
user: str,
password: str,
account: str,
warehouse: str,
database: str = "SNOWFLAKE_LEARNING",
schema: str = "PUBLIC"
):
self.user = user
self.password = password
self.account = account
self.warehouse = warehouse
self.database = database
self.schema = schema
self._connection = None
def connect(self):
"""Establish Snowflake connection"""
if not SNOWFLAKE_AVAILABLE:
raise ImportError("snowflake-connector-python is not installed. Install with: pip install snowflake-connector-python")
logger.info("=" * 80)
logger.info("πŸ”Œ SNOWFLAKE CONNECTION: Attempting to connect...")
logger.info(f" - Account: {self.account}")
logger.info(f" - Warehouse: {self.warehouse}")
logger.info(f" - Database: {self.database}")
logger.info(f" - Schema: {self.schema}")
logger.info(f" - User: {self.user}")
try:
self._connection = snowflake.connector.connect(
user=self.user,
password=self.password,
account=self.account,
warehouse=self.warehouse
# Don't set database/schema in connection - we'll do it per query
)
logger.info("βœ… SNOWFLAKE CONNECTION: Successfully connected")
logger.info("=" * 80)
print(f"βœ… Connected to Snowflake: {self.database}.{self.schema}")
except Exception as e:
logger.error(f"❌ SNOWFLAKE CONNECTION FAILED: {e}")
logger.error("=" * 80)
print(f"❌ Failed to connect to Snowflake: {e}")
raise
def disconnect(self):
"""Close Snowflake connection"""
if self._connection:
self._connection.close()
print("βœ… Disconnected from Snowflake")
def insert_feedback(self, feedback: UserFeedback, table_name: Optional[str] = None) -> bool:
"""Insert a single feedback record into Snowflake"""
logger.info("=" * 80)
logger.info("πŸ”„ SNOWFLAKE INSERT: Starting feedback insertion process")
logger.info(f"πŸ“ Feedback ID: {feedback.feedback_id}")
# Get table name from parameter, env var, or default
if table_name is None:
table_name = os.getenv("SNOWFLAKE_FEEDBACK_TABLE", "USER_FEEDBACK_V3")
if not self._connection:
logger.error("❌ Not connected to Snowflake. Call connect() first.")
raise RuntimeError("Not connected to Snowflake. Call connect() first.")
try:
logger.info("πŸ“Š VALIDATION: Validating feedback data structure...")
# Validate feedback object
validation_errors = []
if not feedback.feedback_id:
validation_errors.append("Missing feedback_id")
if feedback.score is None:
validation_errors.append("Missing score")
if feedback.timestamp is None:
validation_errors.append("Missing timestamp")
if validation_errors:
logger.error(f"❌ VALIDATION FAILED: {validation_errors}")
return False
else:
logger.info("βœ… VALIDATION PASSED: All required fields present")
logger.info("πŸ“‹ Data Summary:")
logger.info(f" - Feedback ID: {feedback.feedback_id}")
logger.info(f" - Score: {feedback.score}")
logger.info(f" - Conversation ID: {feedback.conversation_id}")
logger.info(f" - Has Retrievals: {feedback.has_retrievals}")
logger.info(f" - Retrieval Count: {feedback.retrieval_count}")
logger.info(f" - Message Count: {feedback.message_count}")
logger.info(f" - Timestamp: {feedback.timestamp}")
cursor = self._connection.cursor()
logger.info("βœ… SNOWFLAKE CONNECTION: Cursor created")
# Set database and schema context
logger.info(f"πŸ”§ SETTING CONTEXT: Database={self.database}, Schema={self.schema}")
try:
cursor.execute(f'USE DATABASE "{self.database}"')
cursor.execute(f'USE SCHEMA "{self.schema}"')
cursor.execute("SELECT CURRENT_DATABASE(), CURRENT_SCHEMA()")
current_db, current_schema = cursor.fetchone()
logger.info(f"βœ… Current context verified: Database={current_db}, Schema={current_schema}")
except Exception as e:
logger.error(f"❌ Could not set context: {e}")
raise
# Prepare data - convert to JSON strings for VARIANT columns (same approach as old retrieved_data)
logger.info("πŸ”§ DATA PREPARATION: Preparing VARIANT columns...")
feedback_dict = feedback.to_dict()
# Prepare transcript (ARRAY) - convert to JSON string
transcript_raw = feedback_dict.get('transcript', [])
if transcript_raw:
# Convert to JSON string (same approach as old retrieved_data)
transcript_for_db = json.dumps(transcript_raw)
logger.info(f" - Transcript: {len(transcript_raw)} messages, JSON length: {len(transcript_for_db)}")
else:
transcript_for_db = None
logger.info(" - Transcript: None")
# Prepare retrievals (ARRAY) - convert to JSON string
retrievals_raw = feedback_dict.get('retrievals', [])
if retrievals_raw:
# Convert to JSON string (same approach as old retrieved_data)
retrievals_for_db = json.dumps(retrievals_raw)
logger.info(f" - Retrievals: {len(retrievals_raw)} entries, JSON length: {len(retrievals_for_db)}")
else:
retrievals_for_db = None
logger.info(" - Retrievals: None")
# Prepare feedback_score_related_retrieval_docs (OBJECT) - convert to JSON string
feedback_score_related_raw = feedback_dict.get('feedback_score_related_retrieval_docs')
if feedback_score_related_raw:
# Convert to JSON string (same approach as old retrieved_data)
feedback_score_related_for_db = json.dumps(feedback_score_related_raw)
logger.info(f" - Feedback score related docs: present, JSON length: {len(feedback_score_related_for_db)}")
else:
feedback_score_related_for_db = None
logger.info(" - Feedback score related docs: None")
# Prepare retrieved_data (preserved old column) - convert to JSON string
retrieved_data_raw = feedback_dict.get('retrieved_data')
if retrieved_data_raw:
# Convert to JSON string (same approach as old retrieved_data)
retrieved_data_for_db = json.dumps(retrieved_data_raw)
logger.info(f" - Retrieved data (preserved): present, JSON length: {len(retrieved_data_for_db)}")
else:
retrieved_data_for_db = None
logger.info(" - Retrieved data (preserved): None")
# Build SQL with new column structure
# Columns are VARCHAR (storing JSON strings), same approach as old retrieved_data
sql = f"""INSERT INTO {table_name} (
feedback_id,
open_ended_feedback,
score,
is_feedback_about_last_retrieval,
conversation_id,
timestamp,
message_count,
has_retrievals,
retrieval_count,
transcript,
retrievals,
feedback_score_related_retrieval_docs,
retrieved_data,
created_at
) VALUES (
%(feedback_id)s, %(open_ended_feedback)s, %(score)s, %(is_feedback_about_last_retrieval)s,
%(conversation_id)s, %(timestamp)s, %(message_count)s, %(has_retrievals)s,
%(retrieval_count)s, %(transcript)s, %(retrievals)s, %(feedback_score_related_retrieval_docs)s,
%(retrieved_data)s, %(created_at)s
)"""
logger.info("πŸ“ SQL PREPARATION: Building INSERT statement...")
logger.info(f" - Target table: {table_name}")
logger.info(f" - Database: {self.database}")
logger.info(f" - Schema: {self.schema}")
# Prepare parameters
# Pass JSON strings for VARIANT columns (same approach as old retrieved_data)
params = {
'feedback_id': feedback.feedback_id,
'open_ended_feedback': feedback.open_ended_feedback,
'score': feedback.score,
'is_feedback_about_last_retrieval': feedback.is_feedback_about_last_retrieval,
'conversation_id': feedback.conversation_id,
'timestamp': int(feedback.timestamp),
'message_count': feedback.message_count,
'has_retrievals': feedback.has_retrievals,
'retrieval_count': feedback.retrieval_count,
'transcript': transcript_for_db, # JSON string
'retrievals': retrievals_for_db, # JSON string
'feedback_score_related_retrieval_docs': feedback_score_related_for_db, # JSON string
'retrieved_data': retrieved_data_for_db, # JSON string - preserved old column
'created_at': feedback.created_at
}
# Execute insert
logger.info("πŸš€ SQL EXECUTION: Executing INSERT query...")
cursor.execute(sql, params)
logger.info("βœ… SQL EXECUTION: Query executed successfully")
logger.info(f" - Rows affected: 1")
logger.info(f" - Status: SUCCESS")
cursor.close()
logger.info("βœ… SNOWFLAKE INSERT: Feedback inserted successfully")
logger.info(f"πŸ“ Inserted feedback: {feedback.feedback_id}")
logger.info("=" * 80)
return True
except Exception as e:
# Check if it's a Snowflake error
if SNOWFLAKE_AVAILABLE and "ProgrammingError" in str(type(e)):
logger.error(f"❌ SQL EXECUTION ERROR: {e}")
logger.error(f" - Error code: {getattr(e, 'errno', 'Unknown')}")
logger.error(f" - SQL state: {getattr(e, 'sqlstate', 'Unknown')}")
else:
logger.error(f"❌ SNOWFLAKE INSERT FAILED: {type(e).__name__}")
logger.error(f" - Error: {e}")
logger.error("=" * 80)
return False
def __enter__(self):
"""Context manager entry"""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.disconnect()
def get_snowflake_connector_from_env() -> Optional[SnowflakeFeedbackConnector]:
"""Create Snowflake connector from environment variables"""
user = os.getenv("SNOWFLAKE_USER")
password = os.getenv("SNOWFLAKE_PASSWORD")
account = os.getenv("SNOWFLAKE_ACCOUNT")
warehouse = os.getenv("SNOWFLAKE_WAREHOUSE")
database = os.getenv("SNOWFLAKE_DATABASE", "SNOWFLAKE_LEARN")
schema = os.getenv("SNOWFLAKE_SCHEMA", "PUBLIC")
if not all([user, password, account, warehouse]):
print("⚠️ Snowflake credentials not found in environment variables")
print("Required variables: SNOWFLAKE_USER, SNOWFLAKE_PASSWORD, SNOWFLAKE_ACCOUNT, SNOWFLAKE_WAREHOUSE")
return None
return SnowflakeFeedbackConnector(
user=user,
password=password,
account=account,
warehouse=warehouse,
database=database,
schema=schema
)
def save_to_snowflake(feedback: UserFeedback, table_name: Optional[str] = None) -> bool:
"""Helper function to save feedback to Snowflake"""
logger.info("=" * 80)
logger.info("πŸ”΅ SNOWFLAKE SAVE: Starting save process")
logger.info(f"πŸ“ Feedback ID: {feedback.feedback_id}")
# Get table name from parameter or env var
if table_name is None:
table_name = os.getenv("SNOWFLAKE_FEEDBACK_TABLE", "USER_FEEDBACK_V3")
connector = get_snowflake_connector_from_env()
if not connector:
logger.warning("⚠️ SNOWFLAKE SAVE: Skipping insertion (credentials not configured)")
logger.warning(" Required variables: SNOWFLAKE_USER, SNOWFLAKE_PASSWORD, SNOWFLAKE_ACCOUNT, SNOWFLAKE_WAREHOUSE")
logger.info("=" * 80)
return False
try:
logger.info("πŸ“‘ SNOWFLAKE SAVE: Establishing connection...")
connector.connect()
logger.info("βœ… SNOWFLAKE SAVE: Connection established")
logger.info("πŸ“₯ SNOWFLAKE SAVE: Attempting to insert feedback...")
success = connector.insert_feedback(feedback, table_name=table_name)
logger.info("πŸ”Œ SNOWFLAKE SAVE: Disconnecting...")
connector.disconnect()
if success:
logger.info("βœ… SNOWFLAKE SAVE: Successfully saved feedback")
else:
logger.error("❌ SNOWFLAKE SAVE: Failed to save feedback")
logger.info("=" * 80)
return success
except Exception as e:
logger.error(f"❌ SNOWFLAKE SAVE ERROR: {type(e).__name__}")
logger.error(f" - Error: {e}")
logger.info("=" * 80)
return False