""" Snowflake Connector for Feedback System This module handles inserting user feedback into Snowflake. """ import os import json import logging from typing import Dict, Any, Optional from .feedback_schema import UserFeedback # Try to import snowflake connector try: import snowflake.connector SNOWFLAKE_AVAILABLE = True except ImportError: SNOWFLAKE_AVAILABLE = False logging.warning("⚠️ snowflake-connector-python not installed. Install with: pip install snowflake-connector-python") # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class SnowflakeFeedbackConnector: """Connector for inserting feedback into Snowflake""" def __init__( self, user: str, password: str, account: str, warehouse: str, database: str = "SNOWFLAKE_LEARNING", schema: str = "PUBLIC" ): self.user = user self.password = password self.account = account self.warehouse = warehouse self.database = database self.schema = schema self._connection = None def connect(self): """Establish Snowflake connection""" if not SNOWFLAKE_AVAILABLE: raise ImportError("snowflake-connector-python is not installed. Install with: pip install snowflake-connector-python") logger.info("=" * 80) logger.info("🔌 SNOWFLAKE CONNECTION: Attempting to connect...") logger.info(f" - Account: {self.account}") logger.info(f" - Warehouse: {self.warehouse}") logger.info(f" - Database: {self.database}") logger.info(f" - Schema: {self.schema}") logger.info(f" - User: {self.user}") try: self._connection = snowflake.connector.connect( user=self.user, password=self.password, account=self.account, warehouse=self.warehouse # Don't set database/schema in connection - we'll do it per query ) logger.info("✅ SNOWFLAKE CONNECTION: Successfully connected") logger.info("=" * 80) print(f"✅ Connected to Snowflake: {self.database}.{self.schema}") except Exception as e: logger.error(f"❌ SNOWFLAKE CONNECTION FAILED: {e}") logger.error("=" * 80) print(f"❌ Failed to connect to Snowflake: {e}") raise def disconnect(self): """Close Snowflake connection""" if self._connection: self._connection.close() print("✅ Disconnected from Snowflake") def insert_feedback(self, feedback: UserFeedback, table_name: Optional[str] = None) -> bool: """Insert a single feedback record into Snowflake""" logger.info("=" * 80) logger.info("🔄 SNOWFLAKE INSERT: Starting feedback insertion process") logger.info(f"📝 Feedback ID: {feedback.feedback_id}") # Get table name from parameter, env var, or default if table_name is None: table_name = os.getenv("SNOWFLAKE_FEEDBACK_TABLE", "USER_FEEDBACK_V3") if not self._connection: logger.error("❌ Not connected to Snowflake. Call connect() first.") raise RuntimeError("Not connected to Snowflake. Call connect() first.") try: logger.info("📊 VALIDATION: Validating feedback data structure...") # Validate feedback object validation_errors = [] if not feedback.feedback_id: validation_errors.append("Missing feedback_id") if feedback.score is None: validation_errors.append("Missing score") if feedback.timestamp is None: validation_errors.append("Missing timestamp") if validation_errors: logger.error(f"❌ VALIDATION FAILED: {validation_errors}") return False else: logger.info("✅ VALIDATION PASSED: All required fields present") logger.info("📋 Data Summary:") logger.info(f" - Feedback ID: {feedback.feedback_id}") logger.info(f" - Score: {feedback.score}") logger.info(f" - Conversation ID: {feedback.conversation_id}") logger.info(f" - Has Retrievals: {feedback.has_retrievals}") logger.info(f" - Retrieval Count: {feedback.retrieval_count}") logger.info(f" - Message Count: {feedback.message_count}") logger.info(f" - Timestamp: {feedback.timestamp}") cursor = self._connection.cursor() logger.info("✅ SNOWFLAKE CONNECTION: Cursor created") # Set database and schema context logger.info(f"🔧 SETTING CONTEXT: Database={self.database}, Schema={self.schema}") try: cursor.execute(f'USE DATABASE "{self.database}"') cursor.execute(f'USE SCHEMA "{self.schema}"') cursor.execute("SELECT CURRENT_DATABASE(), CURRENT_SCHEMA()") current_db, current_schema = cursor.fetchone() logger.info(f"✅ Current context verified: Database={current_db}, Schema={current_schema}") except Exception as e: logger.error(f"❌ Could not set context: {e}") raise # Prepare data - convert to JSON strings for VARIANT columns (same approach as old retrieved_data) logger.info("🔧 DATA PREPARATION: Preparing VARIANT columns...") feedback_dict = feedback.to_dict() # Prepare transcript (ARRAY) - convert to JSON string transcript_raw = feedback_dict.get('transcript', []) if transcript_raw: # Convert to JSON string (same approach as old retrieved_data) transcript_for_db = json.dumps(transcript_raw) logger.info(f" - Transcript: {len(transcript_raw)} messages, JSON length: {len(transcript_for_db)}") else: transcript_for_db = None logger.info(" - Transcript: None") # Prepare retrievals (ARRAY) - convert to JSON string retrievals_raw = feedback_dict.get('retrievals', []) if retrievals_raw: # Convert to JSON string (same approach as old retrieved_data) retrievals_for_db = json.dumps(retrievals_raw) logger.info(f" - Retrievals: {len(retrievals_raw)} entries, JSON length: {len(retrievals_for_db)}") else: retrievals_for_db = None logger.info(" - Retrievals: None") # Prepare feedback_score_related_retrieval_docs (OBJECT) - convert to JSON string feedback_score_related_raw = feedback_dict.get('feedback_score_related_retrieval_docs') if feedback_score_related_raw: # Convert to JSON string (same approach as old retrieved_data) feedback_score_related_for_db = json.dumps(feedback_score_related_raw) logger.info(f" - Feedback score related docs: present, JSON length: {len(feedback_score_related_for_db)}") else: feedback_score_related_for_db = None logger.info(" - Feedback score related docs: None") # Prepare retrieved_data (preserved old column) - convert to JSON string retrieved_data_raw = feedback_dict.get('retrieved_data') if retrieved_data_raw: # Convert to JSON string (same approach as old retrieved_data) retrieved_data_for_db = json.dumps(retrieved_data_raw) logger.info(f" - Retrieved data (preserved): present, JSON length: {len(retrieved_data_for_db)}") else: retrieved_data_for_db = None logger.info(" - Retrieved data (preserved): None") # Build SQL with new column structure # Columns are VARCHAR (storing JSON strings), same approach as old retrieved_data sql = f"""INSERT INTO {table_name} ( feedback_id, open_ended_feedback, score, is_feedback_about_last_retrieval, conversation_id, timestamp, message_count, has_retrievals, retrieval_count, transcript, retrievals, feedback_score_related_retrieval_docs, retrieved_data, created_at ) VALUES ( %(feedback_id)s, %(open_ended_feedback)s, %(score)s, %(is_feedback_about_last_retrieval)s, %(conversation_id)s, %(timestamp)s, %(message_count)s, %(has_retrievals)s, %(retrieval_count)s, %(transcript)s, %(retrievals)s, %(feedback_score_related_retrieval_docs)s, %(retrieved_data)s, %(created_at)s )""" logger.info("📝 SQL PREPARATION: Building INSERT statement...") logger.info(f" - Target table: {table_name}") logger.info(f" - Database: {self.database}") logger.info(f" - Schema: {self.schema}") # Prepare parameters # Pass JSON strings for VARIANT columns (same approach as old retrieved_data) params = { 'feedback_id': feedback.feedback_id, 'open_ended_feedback': feedback.open_ended_feedback, 'score': feedback.score, 'is_feedback_about_last_retrieval': feedback.is_feedback_about_last_retrieval, 'conversation_id': feedback.conversation_id, 'timestamp': int(feedback.timestamp), 'message_count': feedback.message_count, 'has_retrievals': feedback.has_retrievals, 'retrieval_count': feedback.retrieval_count, 'transcript': transcript_for_db, # JSON string 'retrievals': retrievals_for_db, # JSON string 'feedback_score_related_retrieval_docs': feedback_score_related_for_db, # JSON string 'retrieved_data': retrieved_data_for_db, # JSON string - preserved old column 'created_at': feedback.created_at } # Execute insert logger.info("🚀 SQL EXECUTION: Executing INSERT query...") cursor.execute(sql, params) logger.info("✅ SQL EXECUTION: Query executed successfully") logger.info(f" - Rows affected: 1") logger.info(f" - Status: SUCCESS") cursor.close() logger.info("✅ SNOWFLAKE INSERT: Feedback inserted successfully") logger.info(f"📝 Inserted feedback: {feedback.feedback_id}") logger.info("=" * 80) return True except Exception as e: # Check if it's a Snowflake error if SNOWFLAKE_AVAILABLE and "ProgrammingError" in str(type(e)): logger.error(f"❌ SQL EXECUTION ERROR: {e}") logger.error(f" - Error code: {getattr(e, 'errno', 'Unknown')}") logger.error(f" - SQL state: {getattr(e, 'sqlstate', 'Unknown')}") else: logger.error(f"❌ SNOWFLAKE INSERT FAILED: {type(e).__name__}") logger.error(f" - Error: {e}") logger.error("=" * 80) return False def __enter__(self): """Context manager entry""" self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit""" self.disconnect() def get_snowflake_connector_from_env() -> Optional[SnowflakeFeedbackConnector]: """Create Snowflake connector from environment variables""" user = os.getenv("SNOWFLAKE_USER") password = os.getenv("SNOWFLAKE_PASSWORD") account = os.getenv("SNOWFLAKE_ACCOUNT") warehouse = os.getenv("SNOWFLAKE_WAREHOUSE") database = os.getenv("SNOWFLAKE_DATABASE", "SNOWFLAKE_LEARN") schema = os.getenv("SNOWFLAKE_SCHEMA", "PUBLIC") if not all([user, password, account, warehouse]): print("⚠️ Snowflake credentials not found in environment variables") print("Required variables: SNOWFLAKE_USER, SNOWFLAKE_PASSWORD, SNOWFLAKE_ACCOUNT, SNOWFLAKE_WAREHOUSE") return None return SnowflakeFeedbackConnector( user=user, password=password, account=account, warehouse=warehouse, database=database, schema=schema ) def save_to_snowflake(feedback: UserFeedback, table_name: Optional[str] = None) -> bool: """Helper function to save feedback to Snowflake""" logger.info("=" * 80) logger.info("🔵 SNOWFLAKE SAVE: Starting save process") logger.info(f"📝 Feedback ID: {feedback.feedback_id}") # Get table name from parameter or env var if table_name is None: table_name = os.getenv("SNOWFLAKE_FEEDBACK_TABLE", "USER_FEEDBACK_V3") connector = get_snowflake_connector_from_env() if not connector: logger.warning("⚠️ SNOWFLAKE SAVE: Skipping insertion (credentials not configured)") logger.warning(" Required variables: SNOWFLAKE_USER, SNOWFLAKE_PASSWORD, SNOWFLAKE_ACCOUNT, SNOWFLAKE_WAREHOUSE") logger.info("=" * 80) return False try: logger.info("📡 SNOWFLAKE SAVE: Establishing connection...") connector.connect() logger.info("✅ SNOWFLAKE SAVE: Connection established") logger.info("📥 SNOWFLAKE SAVE: Attempting to insert feedback...") success = connector.insert_feedback(feedback, table_name=table_name) logger.info("🔌 SNOWFLAKE SAVE: Disconnecting...") connector.disconnect() if success: logger.info("✅ SNOWFLAKE SAVE: Successfully saved feedback") else: logger.error("❌ SNOWFLAKE SAVE: Failed to save feedback") logger.info("=" * 80) return success except Exception as e: logger.error(f"❌ SNOWFLAKE SAVE ERROR: {type(e).__name__}") logger.error(f" - Error: {e}") logger.info("=" * 80) return False